Uploaded image for project: 'Hadoop YARN'
  1. Hadoop YARN
  2. YARN-11176

Refactor TestAggregatedLogDeletionService

Details

    • Reviewed

    Description

      The code of TestAggregatedLogDeletionService is quite messy.
      Some refactor could be performed on this code to make it more readable and easier to understand.

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot logged work - 11/Jun/22 15:06
            • Time Spent:
              10m
               
              szilard-nemeth opened a new pull request, #4430:
              URL: https://github.com/apache/hadoop/pull/4430

                 â€¦s to variables<!--
                   Thanks for sending a pull request!
                     1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/HADOOP/How+To+Contribute
                     2. Make sure your PR title starts with JIRA issue id, e.g., 'HADOOP-17799. Your PR title ...'.
                 -->
                 
                 ### Description of PR
                 
                 
                 ### How was this patch tested?
                 
                 
                 ### For code changes:
                 
                 - [ ] Does the title or this PR starts with the corresponding JIRA issue id (e.g. 'HADOOP-17799. Your PR title ...')?
                 - [ ] Object storage: have the integration tests been executed and the endpoint declared according to the connector-specific documentation?
                 - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
                 - [ ] If applicable, have you updated the `LICENSE`, `LICENSE-binary`, `NOTICE-binary` files?
                 
                 


            githubbot ASF GitHub Bot logged work - 11/Jun/22 16:46
            • Time Spent:
              10m
               
              hadoop-yetus commented on PR #4430:
              URL: https://github.com/apache/hadoop/pull/4430#issuecomment-1152962260

                 :broken_heart: **-1 overall**
                 
                 
                 
                 
                 
                 
                 | Vote | Subsystem | Runtime | Logfile | Comment |
                 |:----:|----------:|--------:|:--------:|:-------:|
                 | +0 :ok: | reexec | 0m 46s | | Docker mode activated. |
                 |||| _ Prechecks _ |
                 | +1 :green_heart: | dupname | 0m 1s | | No case conflicting files found. |
                 | +0 :ok: | codespell | 0m 0s | | codespell was not available. |
                 | +0 :ok: | detsecrets | 0m 0s | | detect-secrets was not available. |
                 | +1 :green_heart: | @author | 0m 0s | | The patch does not contain any @author tags. |
                 | +1 :green_heart: | test4tests | 0m 0s | | The patch appears to include 1 new or modified test files. |
                 |||| _ trunk Compile Tests _ |
                 | +1 :green_heart: | mvninstall | 37m 55s | | trunk passed |
                 | +1 :green_heart: | compile | 0m 47s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | compile | 0m 41s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | checkstyle | 0m 37s | | trunk passed |
                 | +1 :green_heart: | mvnsite | 0m 48s | | trunk passed |
                 | +1 :green_heart: | javadoc | 0m 58s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | javadoc | 0m 43s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | spotbugs | 1m 52s | | trunk passed |
                 | +1 :green_heart: | shadedclient | 21m 2s | | branch has no errors when building and testing our client artifacts. |
                 |||| _ Patch Compile Tests _ |
                 | +1 :green_heart: | mvninstall | 0m 40s | | the patch passed |
                 | +1 :green_heart: | compile | 0m 43s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | javac | 0m 43s | | the patch passed |
                 | +1 :green_heart: | compile | 0m 37s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | javac | 0m 37s | | the patch passed |
                 | -1 :x: | blanks | 0m 0s | [/blanks-eol.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/artifact/out/blanks-eol.txt) | The patch has 6 line(s) that end in blanks. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply |
                 | -0 :warning: | checkstyle | 0m 24s | [/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/artifact/out/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt) | hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common: The patch generated 47 new + 1 unchanged - 11 fixed = 48 total (was 12) |
                 | +1 :green_heart: | mvnsite | 0m 40s | | the patch passed |
                 | +1 :green_heart: | javadoc | 0m 38s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | javadoc | 0m 35s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | spotbugs | 1m 44s | | the patch passed |
                 | +1 :green_heart: | shadedclient | 21m 8s | | patch has no errors when building and testing our client artifacts. |
                 |||| _ Other Tests _ |
                 | +1 :green_heart: | unit | 4m 47s | | hadoop-yarn-common in the patch passed. |
                 | +1 :green_heart: | asflicense | 0m 35s | | The patch does not generate ASF License warnings. |
                 | | | 98m 31s | | |
                 
                 
                 | Subsystem | Report/Notes |
                 |----------:|:-------------|
                 | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/artifact/out/Dockerfile |
                 | GITHUB PR | https://github.com/apache/hadoop/pull/4430 |
                 | Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell detsecrets |
                 | uname | Linux 0a0262b033c7 4.15.0-169-generic #177-Ubuntu SMP Thu Feb 3 10:50:38 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux |
                 | Build tool | maven |
                 | Personality | dev-support/bin/hadoop.sh |
                 | git revision | trunk / 0ed1689e94253b84d3fef968d41d9d6124e121a1 |
                 | Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/testReport/ |
                 | Max. process+thread count | 668 (vs. ulimit of 5500) |
                 | modules | C: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common U: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common |
                 | Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/console |
                 | versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
                 | Powered by | Apache Yetus 0.14.0 https://yetus.apache.org |
                 
                 
                 This message was automatically generated.
                 
                 


            githubbot ASF GitHub Bot logged work - 13/Jun/22 13:49
            • Time Spent:
              10m
               
              hadoop-yetus commented on PR #4430:
              URL: https://github.com/apache/hadoop/pull/4430#issuecomment-1153940178

                 :broken_heart: **-1 overall**
                 
                 
                 
                 
                 
                 
                 | Vote | Subsystem | Runtime | Logfile | Comment |
                 |:----:|----------:|--------:|:--------:|:-------:|
                 | +0 :ok: | reexec | 0m 36s | | Docker mode activated. |
                 |||| _ Prechecks _ |
                 | +1 :green_heart: | dupname | 0m 0s | | No case conflicting files found. |
                 | +0 :ok: | codespell | 0m 1s | | codespell was not available. |
                 | +0 :ok: | detsecrets | 0m 1s | | detect-secrets was not available. |
                 | +1 :green_heart: | @author | 0m 0s | | The patch does not contain any @author tags. |
                 | +1 :green_heart: | test4tests | 0m 0s | | The patch appears to include 1 new or modified test files. |
                 |||| _ trunk Compile Tests _ |
                 | +1 :green_heart: | mvninstall | 36m 47s | | trunk passed |
                 | +1 :green_heart: | compile | 1m 5s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | compile | 1m 1s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | checkstyle | 0m 58s | | trunk passed |
                 | +1 :green_heart: | mvnsite | 1m 6s | | trunk passed |
                 | +1 :green_heart: | javadoc | 1m 14s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | javadoc | 1m 4s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | spotbugs | 2m 6s | | trunk passed |
                 | +1 :green_heart: | shadedclient | 21m 14s | | branch has no errors when building and testing our client artifacts. |
                 |||| _ Patch Compile Tests _ |
                 | +1 :green_heart: | mvninstall | 0m 43s | | the patch passed |
                 | +1 :green_heart: | compile | 0m 48s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | javac | 0m 48s | | the patch passed |
                 | +1 :green_heart: | compile | 0m 44s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | javac | 0m 44s | | the patch passed |
                 | -1 :x: | blanks | 0m 0s | [/blanks-eol.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/artifact/out/blanks-eol.txt) | The patch has 5 line(s) that end in blanks. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply |
                 | -0 :warning: | checkstyle | 0m 33s | [/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/artifact/out/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt) | hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common: The patch generated 11 new + 1 unchanged - 11 fixed = 12 total (was 12) |
                 | +1 :green_heart: | mvnsite | 0m 46s | | the patch passed |
                 | +1 :green_heart: | javadoc | 0m 47s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
                 | +1 :green_heart: | javadoc | 0m 45s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | +1 :green_heart: | spotbugs | 1m 45s | | the patch passed |
                 | +1 :green_heart: | shadedclient | 20m 48s | | patch has no errors when building and testing our client artifacts. |
                 |||| _ Other Tests _ |
                 | +1 :green_heart: | unit | 4m 59s | | hadoop-yarn-common in the patch passed. |
                 | +1 :green_heart: | asflicense | 0m 50s | | The patch does not generate ASF License warnings. |
                 | | | 100m 57s | | |
                 
                 
                 | Subsystem | Report/Notes |
                 |----------:|:-------------|
                 | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/artifact/out/Dockerfile |
                 | GITHUB PR | https://github.com/apache/hadoop/pull/4430 |
                 | Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell detsecrets |
                 | uname | Linux 5c3a5eda045f 4.15.0-58-generic #64-Ubuntu SMP Tue Aug 6 11:12:41 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux |
                 | Build tool | maven |
                 | Personality | dev-support/bin/hadoop.sh |
                 | git revision | trunk / 899c014e5dfe1213887dc83f3a2a43d5beea601d |
                 | Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
                 | Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/testReport/ |
                 | Max. process+thread count | 652 (vs. ulimit of 5500) |
                 | modules | C: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common U: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common |
                 | Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/console |
                 | versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
                 | Powered by | Apache Yetus 0.14.0 https://yetus.apache.org |
                 
                 
                 This message was automatically generated.
                 
                 


            githubbot ASF GitHub Bot logged work - 14/Jun/22 11:58
            • Time Spent:
              10m
               
              9uapaw commented on code in PR #4430:
              URL: https://github.com/apache/hadoop/pull/4430#discussion_r896715900


              ##########
              hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java:
              ##########
              @@ -44,514 +44,391 @@
               import org.junit.Test;
               import org.junit.Assert;
               
              +import static org.junit.Assert.assertTrue;
               import static org.mockito.Mockito.*;
               
               public class TestAggregatedLogDeletionService {
              -
              +
              + private static final String T_FILE = "TFile";
              + private static final String USER_ME = "me";
              + private static final String DIR_HOST1 = "host1";
              + private static final String DIR_HOST2 = "host2";
              +
              + private static final String ROOT = "mockfs://foo/";
              + private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
              + private static final String SUFFIX = "logs";
              + private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
              + private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
              +
              + private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
              + ApplicationId appId,
              + String user, String suffix,
              + long modificationTime) {
              + Path path = LogAggregationUtils.getRemoteAppLogDir(
              + remoteRootLogDir, appId, user, suffix);
              + FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
              + return new PathWithFileStatus(path, fileStatus);
              + }
              +
              + private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
              + return new FileStatus(0, true, 0, 0, modificationTime, path);
              + }
              +
              + private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
              + long modificationTime) {
              + Path logPath = new Path(baseDir, childDir);
              + FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
              + return new PathWithFileStatus(logPath, fStatus);
              + }
              +
              + private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
              + long modificationTime) {
              + Path logPath = new Path(baseDir, childDir);
              + FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
              + return new PathWithFileStatus(logPath, fStatus);
              + }
              +
              + private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
              + String user,
              + String suffix,
              + ApplicationId appId,
              + long modificationTime) {
              + Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
              + FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
              + return new PathWithFileStatus(bucketDir, fStatus);
              + }
              +
              + private static FileStatus createFileStatusWithLengthForFile(long length,
              + long modificationTime,
              + Path logPath) {
              + return new FileStatus(length, false, 1, 1, modificationTime, logPath);
              + }
              +
              + private static FileStatus createFileStatusWithLengthForDir(long length,
              + long modificationTime,
              + Path logPath) {
              + return new FileStatus(length, true, 1, 1, modificationTime, logPath);
              + }
              +
                 @Before
                 public void closeFilesystems() throws IOException {
                   // prevent the same mockfs instance from being reused due to FS cache
                   FileSystem.closeAll();
                 }
               
              + private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
              + Configuration conf = new Configuration();
              + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              + retainCheckIntervalSeconds);
              + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
              + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
              + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
              + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
              + LogAggregationTFileController.class.getName());
              + return conf;
              + }
              +
                 @Test
                 public void testDeletion() throws Exception {
                   long now = System.currentTimeMillis();
              - long toDeleteTime = now - (2000*1000);
              - long toKeepTime = now - (1500*1000);
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root+"tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - final Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              + long toDeleteTime = now - (2000 * 1000);
              + long toKeepTime = now - (1500 * 1000);
               
              + Configuration conf = setupConfiguration(1800, -1);
               
              - Path rootPath = new Path(root);
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
                   
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir);
              -
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[]{userDirStatus});
              -
              - ApplicationId appId1 =
              - ApplicationId.newInstance(now, 1);
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus suffixDirStatus = new FileStatus(0, true,
              - 0, 0, toDeleteTime, suffixDir);
              - Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
              - remoteRootLogPath, "me", suffix, appId1);
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0,
              - 0, toDeleteTime, bucketDir);
              - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId1, "me", suffix);
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
              - toDeleteTime, app1Dir);
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
              + toKeepTime);
              +
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
              +
              + ApplicationId appId1 = ApplicationId.newInstance(now, 1);
              + ApplicationId appId2 = ApplicationId.newInstance(now, 2);
              + ApplicationId appId3 = ApplicationId.newInstance(now, 3);
              + ApplicationId appId4 = ApplicationId.newInstance(now, 4);
              +
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
              + toDeleteTime);
              + PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
              + toDeleteTime);
              +
              + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
              + USER_ME, SUFFIX, toDeleteTime);
              + PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
              + USER_ME, SUFFIX, toDeleteTime);
              + PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
              + USER_ME, SUFFIX, toDeleteTime);
              + PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
              + USER_ME, SUFFIX, toDeleteTime);
              +
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
              + app1.fileStatus, app2.fileStatus,
              + app3.fileStatus, app4.fileStatus});
                   
              - ApplicationId appId2 =
              - ApplicationId.newInstance(now, 2);
              - Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId2, "me", suffix);
              - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
              - toDeleteTime, app2Dir);
              -
              - ApplicationId appId3 =
              - ApplicationId.newInstance(now, 3);
              - Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId3, "me", suffix);
              - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
              - toDeleteTime, app3Dir);
              -
              - ApplicationId appId4 =
              - ApplicationId.newInstance(now, 4);
              - Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId4, "me", suffix);
              - FileStatus app4DirStatus =
              - new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
              -
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixDirStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus});
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus, app2DirStatus,
              - app3DirStatus, app4DirStatus});
              -
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[]{});
              -
              -
              - Path app2Log1 = new Path(app2Dir, "host1");
              - FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
              -
              - Path app2Log2 = new Path(app2Dir, "host2");
              - FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
              -
              - when(mockFs.listStatus(app2Dir)).thenReturn(
              - new FileStatus[]{app2Log1Status, app2Log2Status});
              -
              - Path app3Log1 = new Path(app3Dir, "host1");
              - FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
              -
              - Path app3Log2 = new Path(app3Dir, "host2");
              - FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
              -
              - when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
              -
              - when(mockFs.listStatus(app3Dir)).thenReturn(
              - new FileStatus[]{app3Log1Status, app3Log2Status});
              -
              - Path app4Log1 = new Path(app4Dir, "host1");
              - FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
              -
              - Path app4Log2 = new Path(app4Dir, "host2");
              - FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
              - toKeepTime, app4Log2);
              -
              - when(mockFs.listStatus(app4Dir)).thenReturn(
              - new FileStatus[]{app4Log1Status, app4Log2Status});
              -
              - final List<ApplicationId> finishedApplications =
              - Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
              - final List<ApplicationId> runningApplications =
              - Collections.unmodifiableList(Arrays.asList(appId4));
              + PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
              + toDeleteTime);
              + PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2,
              + toKeepTime);
              + PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
              + toDeleteTime);
              + PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
              + toDeleteTime);
              + PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
              + toDeleteTime);
              + PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
              +
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
              + when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
              + app2Log2.fileStatus});
              + when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
              + app3Log2.fileStatus});
              + when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
              + app4Log2.fileStatus});
              + when(mockFs.delete(app3.path, true)).thenThrow(
              + new AccessControlException("Injected Error\nStack Trace :("));
              +
              + final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
              + Arrays.asList(appId1, appId2, appId3));
              + final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
               
                   AggregatedLogDeletionService deletionService =
              - new AggregatedLogDeletionService() {
              - @Override
              - protected ApplicationClientProtocol createRMClient()
              - throws IOException {
              - try {
              - return createMockRMClient(finishedApplications,
              - runningApplications);
              - } catch (Exception e) {
              - throw new IOException(e);
              - }
              - }
              - @Override
              - protected void stopRMClient() {
              - // DO NOTHING
              - }
              - };
              + new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
                   deletionService.init(conf);
                   deletionService.start();
               
              - verify(mockFs, timeout(2000)).delete(app1Dir, true);
              - verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
              - verify(mockFs, timeout(2000)).delete(app3Dir, true);
              - verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
              - verify(mockFs, timeout(2000)).delete(app4Log1, true);
              - verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
              + int timeout = 2000;
              + verify(mockFs, timeout(timeout)).delete(app1.path, true);
              + verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
              + verify(mockFs, timeout(timeout)).delete(app3.path, true);
              + verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
              + verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
              + verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
               
                   deletionService.stop();
                 }
               
                 @Test
                 public void testRefreshLogRetentionSettings() throws Exception {
                   long now = System.currentTimeMillis();
              - //time before 2000 sec
                   long before2000Secs = now - (2000 * 1000);
              - //time before 50 sec
                   long before50Secs = now - (50 * 1000);
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root + "tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - final Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              - "1");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              -
              -
              - Path rootPath = new Path(root);
              + int checkIntervalSeconds = 2;
              + int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
              +
              + Configuration conf = setupConfiguration(1800, 1);
              +
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
               
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
              - userDir);
              + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
              + ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
              +
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
               
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[] { userDirStatus });
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
              + before50Secs);
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
              + before50Secs);
              + PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
              + USER_ME, SUFFIX, appId1, before50Secs);
               
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
              - suffixDir);
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
               
              - ApplicationId appId1 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 1);
                   //Set time last modified of app1Dir directory and its files to before2000Secs
              - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId1, "me", suffix);
              - Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
              - remoteRootLogPath, "me", suffix, appId1);
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0,
              - 0, before50Secs, bucketDir);
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
              - app1Dir);
              -
              - ApplicationId appId2 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 2);
              - //Set time last modified of app1Dir directory and its files to before50Secs
              - Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId2, "me", suffix);
              - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
              - app2Dir);
              + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
              + USER_ME, SUFFIX, before2000Secs);
               
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixStatus });
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus });
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus, app2DirStatus });
              -
              - Path app1Log1 = new Path(app1Dir, "host1");
              - FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
              - app1Log1);
              + //Set time last modified of app1Dir directory and its files to before50Secs
              + PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
              + USER_ME, SUFFIX, before50Secs);
               
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[] { app1Log1Status });
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
              + app2.fileStatus});
               
              - Path app2Log1 = new Path(app2Dir, "host1");
              - FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
              - app2Log1);
              + PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
              + before2000Secs);
              + PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
              + before50Secs);
               
              - when(mockFs.listStatus(app2Dir)).thenReturn(
              - new FileStatus[] { app2Log1Status });
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
              + when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
               
                   final List<ApplicationId> finishedApplications =
                       Collections.unmodifiableList(Arrays.asList(appId1, appId2));
               
              - AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
              - @Override
              - protected Configuration createConf() {
              - return conf;
              - }
              - @Override
              - protected ApplicationClientProtocol createRMClient()
              - throws IOException {
              - try {
              - return createMockRMClient(finishedApplications, null);
              - } catch (Exception e) {
              - throw new IOException(e);
              - }
              - }
              - @Override
              - protected void stopRMClient() {
              - // DO NOTHING
              - }
              - };
              + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
              + finishedApplications, conf);
                   
                   deletionSvc.init(conf);
                   deletionSvc.start();
                   
                   //app1Dir would be deleted since its done above log retention period
              - verify(mockFs, timeout(10000)).delete(app1Dir, true);
              - //app2Dir is not expected to be deleted since its below the threshold
              - verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
              -
              - //Now,lets change the confs
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              - "2");
              + verify(mockFs, timeout(10000)).delete(app1.path, true);
              + //app2Dir is not expected to be deleted since it is below the threshold
              + verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
              +
              + //Now, let's change the confs
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              + checkIntervalSeconds);
                   //We have not called refreshLogSettings,hence don't expect to see the changed conf values
              - Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
              + assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
                   
                   //refresh the log settings
                   deletionSvc.refreshLogRetentionSettings();
               
                   //Check interval time should reflect the new value
              - Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
              + Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
                   //app2Dir should be deleted since it falls above the threshold
              - verify(mockFs, timeout(10000)).delete(app2Dir, true);
              + verify(mockFs, timeout(10000)).delete(app2.path, true);
                   deletionSvc.stop();
                 }
                 
                 @Test
                 public void testCheckInterval() throws Exception {
              - long RETENTION_SECS = 10 * 24 * 3600;
                   long now = System.currentTimeMillis();
              - long toDeleteTime = now - RETENTION_SECS*1000;
              -
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root+"tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              + long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
               
              + Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
               
                   // prevent us from picking up the same mockfs instance from another test
                   FileSystem.closeAll();
              - Path rootPath = new Path(root);
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
               
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
               
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[]{userDirStatus});
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
               
              - ApplicationId appId1 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 1);
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
              - suffixDir);
              - Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
              - remoteRootLogPath, "me", suffix, appId1);
              - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId1, "me", suffix);
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0,
              - 0, now, bucketDir);
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
               
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
              + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
              + PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
              + USER_ME, SUFFIX, appId1, now);
               
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixDirStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus});
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus});
              + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
              + USER_ME, SUFFIX, now);
              + PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
               
              - Path app1Log1 = new Path(app1Dir, "host1");
              - FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
               
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[]{app1Log1Status});
              + final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
               
              - final List<ApplicationId> finishedApplications =
              - Collections.unmodifiableList(Arrays.asList(appId1));
              -
              - AggregatedLogDeletionService deletionSvc =
              - new AggregatedLogDeletionService() {
              - @Override
              - protected ApplicationClientProtocol createRMClient()
              - throws IOException {
              - try {
              - return createMockRMClient(finishedApplications, null);
              - } catch (Exception e) {
              - throw new IOException(e);
              - }
              - }
              - @Override
              - protected void stopRMClient() {
              - // DO NOTHING
              - }
              - };
              + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
              + finishedApplications);
                   deletionSvc.init(conf);
                   deletionSvc.start();
                
                   verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
              - verify(mockFs, never()).delete(app1Dir, true);
              -
              - // modify the timestamp of the logs and verify it's picked up quickly
              - bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
              - app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
              - app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixDirStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus });
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus });
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[]{app1Log1Status});
              -
              - verify(mockFs, timeout(10000)).delete(app1Dir, true);
              + verify(mockFs, never()).delete(app1.path, true);
              +
              + // modify the timestamp of the logs and verify if it's picked up quickly
              + app1.changeModificationTime(toDeleteTime);
              + app1Log1.changeModificationTime(toDeleteTime);
              + bucketDir.changeModificationTime(toDeleteTime);
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
              +
              + verify(mockFs, timeout(10000)).delete(app1.path, true);
               
                   deletionSvc.stop();
                 }
               
                 @Test
                 public void testRobustLogDeletion() throws Exception {
              - final long RETENTION_SECS = 10 * 24 * 3600;
              -
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root+"tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class,
              - FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              - "1");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              + Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
               
                   // prevent us from picking up the same mockfs instance from another test
                   FileSystem.closeAll();
              - Path rootPath = new Path(root);
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
               
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
              - FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
              - Path bucketDir = new Path(suffixDir, String.valueOf(0));
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
              -
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[]{userDirStatus});
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[]{suffixStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[]{bucketDirStatus});
              -
              - ApplicationId appId1 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 1);
              - Path app1Dir = new Path(bucketDir, appId1.toString());
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
              - ApplicationId appId2 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 2);
              - Path app2Dir = new Path(bucketDir, "application_a");
              - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
              - ApplicationId appId3 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 3);
              - Path app3Dir = new Path(bucketDir, appId3.toString());
              - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
              -
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
              - when(mockFs.listStatus(app2Dir)).thenReturn(
              - new FileStatus[]{});
              -
              - when(mockFs.listStatus(app1Dir)).thenThrow(
              - new RuntimeException("Should Be Caught and Logged"));
              - Path app3Log3 = new Path(app3Dir, "host1");
              - FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
              - when(mockFs.listStatus(app3Dir)).thenReturn(
              - new FileStatus[]{app3Log3Status});
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
               
              - final List<ApplicationId> finishedApplications =
              - Collections.unmodifiableList(Arrays.asList(appId1, appId3));
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
              + PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
              +
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
               
              - ApplicationClientProtocol rmClient =
              - createMockRMClient(finishedApplications, null);
              + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
              + ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
              + ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
              +
              + PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
              + PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);

              Review Comment:
                 Is this intentionally set as application_a instead of AppId2?



            githubbot ASF GitHub Bot logged work - 14/Jun/22 12:35
            • Time Spent:
              10m
               
              szilard-nemeth commented on code in PR #4430:
              URL: https://github.com/apache/hadoop/pull/4430#discussion_r896760612


              ##########
              hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java:
              ##########
              @@ -44,514 +44,391 @@
               import org.junit.Test;
               import org.junit.Assert;
               
              +import static org.junit.Assert.assertTrue;
               import static org.mockito.Mockito.*;
               
               public class TestAggregatedLogDeletionService {
              -
              +
              + private static final String T_FILE = "TFile";
              + private static final String USER_ME = "me";
              + private static final String DIR_HOST1 = "host1";
              + private static final String DIR_HOST2 = "host2";
              +
              + private static final String ROOT = "mockfs://foo/";
              + private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
              + private static final String SUFFIX = "logs";
              + private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
              + private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
              +
              + private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
              + ApplicationId appId,
              + String user, String suffix,
              + long modificationTime) {
              + Path path = LogAggregationUtils.getRemoteAppLogDir(
              + remoteRootLogDir, appId, user, suffix);
              + FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
              + return new PathWithFileStatus(path, fileStatus);
              + }
              +
              + private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
              + return new FileStatus(0, true, 0, 0, modificationTime, path);
              + }
              +
              + private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
              + long modificationTime) {
              + Path logPath = new Path(baseDir, childDir);
              + FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
              + return new PathWithFileStatus(logPath, fStatus);
              + }
              +
              + private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
              + long modificationTime) {
              + Path logPath = new Path(baseDir, childDir);
              + FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
              + return new PathWithFileStatus(logPath, fStatus);
              + }
              +
              + private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
              + String user,
              + String suffix,
              + ApplicationId appId,
              + long modificationTime) {
              + Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
              + FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
              + return new PathWithFileStatus(bucketDir, fStatus);
              + }
              +
              + private static FileStatus createFileStatusWithLengthForFile(long length,
              + long modificationTime,
              + Path logPath) {
              + return new FileStatus(length, false, 1, 1, modificationTime, logPath);
              + }
              +
              + private static FileStatus createFileStatusWithLengthForDir(long length,
              + long modificationTime,
              + Path logPath) {
              + return new FileStatus(length, true, 1, 1, modificationTime, logPath);
              + }
              +
                 @Before
                 public void closeFilesystems() throws IOException {
                   // prevent the same mockfs instance from being reused due to FS cache
                   FileSystem.closeAll();
                 }
               
              + private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
              + Configuration conf = new Configuration();
              + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              + retainCheckIntervalSeconds);
              + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
              + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
              + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
              + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
              + LogAggregationTFileController.class.getName());
              + return conf;
              + }
              +
                 @Test
                 public void testDeletion() throws Exception {
                   long now = System.currentTimeMillis();
              - long toDeleteTime = now - (2000*1000);
              - long toKeepTime = now - (1500*1000);
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root+"tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - final Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              + long toDeleteTime = now - (2000 * 1000);
              + long toKeepTime = now - (1500 * 1000);
               
              + Configuration conf = setupConfiguration(1800, -1);
               
              - Path rootPath = new Path(root);
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
                   
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir);
              -
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[]{userDirStatus});
              -
              - ApplicationId appId1 =
              - ApplicationId.newInstance(now, 1);
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus suffixDirStatus = new FileStatus(0, true,
              - 0, 0, toDeleteTime, suffixDir);
              - Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
              - remoteRootLogPath, "me", suffix, appId1);
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0,
              - 0, toDeleteTime, bucketDir);
              - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId1, "me", suffix);
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
              - toDeleteTime, app1Dir);
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
              + toKeepTime);
              +
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
              +
              + ApplicationId appId1 = ApplicationId.newInstance(now, 1);
              + ApplicationId appId2 = ApplicationId.newInstance(now, 2);
              + ApplicationId appId3 = ApplicationId.newInstance(now, 3);
              + ApplicationId appId4 = ApplicationId.newInstance(now, 4);
              +
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
              + toDeleteTime);
              + PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
              + toDeleteTime);
              +
              + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
              + USER_ME, SUFFIX, toDeleteTime);
              + PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
              + USER_ME, SUFFIX, toDeleteTime);
              + PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
              + USER_ME, SUFFIX, toDeleteTime);
              + PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
              + USER_ME, SUFFIX, toDeleteTime);
              +
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
              + app1.fileStatus, app2.fileStatus,
              + app3.fileStatus, app4.fileStatus});
                   
              - ApplicationId appId2 =
              - ApplicationId.newInstance(now, 2);
              - Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId2, "me", suffix);
              - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
              - toDeleteTime, app2Dir);
              -
              - ApplicationId appId3 =
              - ApplicationId.newInstance(now, 3);
              - Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId3, "me", suffix);
              - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
              - toDeleteTime, app3Dir);
              -
              - ApplicationId appId4 =
              - ApplicationId.newInstance(now, 4);
              - Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId4, "me", suffix);
              - FileStatus app4DirStatus =
              - new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
              -
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixDirStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus});
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus, app2DirStatus,
              - app3DirStatus, app4DirStatus});
              -
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[]{});
              -
              -
              - Path app2Log1 = new Path(app2Dir, "host1");
              - FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
              -
              - Path app2Log2 = new Path(app2Dir, "host2");
              - FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
              -
              - when(mockFs.listStatus(app2Dir)).thenReturn(
              - new FileStatus[]{app2Log1Status, app2Log2Status});
              -
              - Path app3Log1 = new Path(app3Dir, "host1");
              - FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
              -
              - Path app3Log2 = new Path(app3Dir, "host2");
              - FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
              -
              - when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
              -
              - when(mockFs.listStatus(app3Dir)).thenReturn(
              - new FileStatus[]{app3Log1Status, app3Log2Status});
              -
              - Path app4Log1 = new Path(app4Dir, "host1");
              - FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
              -
              - Path app4Log2 = new Path(app4Dir, "host2");
              - FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
              - toKeepTime, app4Log2);
              -
              - when(mockFs.listStatus(app4Dir)).thenReturn(
              - new FileStatus[]{app4Log1Status, app4Log2Status});
              -
              - final List<ApplicationId> finishedApplications =
              - Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
              - final List<ApplicationId> runningApplications =
              - Collections.unmodifiableList(Arrays.asList(appId4));
              + PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
              + toDeleteTime);
              + PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2,
              + toKeepTime);
              + PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
              + toDeleteTime);
              + PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
              + toDeleteTime);
              + PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
              + toDeleteTime);
              + PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
              +
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
              + when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
              + app2Log2.fileStatus});
              + when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
              + app3Log2.fileStatus});
              + when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
              + app4Log2.fileStatus});
              + when(mockFs.delete(app3.path, true)).thenThrow(
              + new AccessControlException("Injected Error\nStack Trace :("));
              +
              + final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
              + Arrays.asList(appId1, appId2, appId3));
              + final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
               
                   AggregatedLogDeletionService deletionService =
              - new AggregatedLogDeletionService() {
              - @Override
              - protected ApplicationClientProtocol createRMClient()
              - throws IOException {
              - try {
              - return createMockRMClient(finishedApplications,
              - runningApplications);
              - } catch (Exception e) {
              - throw new IOException(e);
              - }
              - }
              - @Override
              - protected void stopRMClient() {
              - // DO NOTHING
              - }
              - };
              + new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
                   deletionService.init(conf);
                   deletionService.start();
               
              - verify(mockFs, timeout(2000)).delete(app1Dir, true);
              - verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
              - verify(mockFs, timeout(2000)).delete(app3Dir, true);
              - verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
              - verify(mockFs, timeout(2000)).delete(app4Log1, true);
              - verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
              + int timeout = 2000;
              + verify(mockFs, timeout(timeout)).delete(app1.path, true);
              + verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
              + verify(mockFs, timeout(timeout)).delete(app3.path, true);
              + verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
              + verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
              + verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
               
                   deletionService.stop();
                 }
               
                 @Test
                 public void testRefreshLogRetentionSettings() throws Exception {
                   long now = System.currentTimeMillis();
              - //time before 2000 sec
                   long before2000Secs = now - (2000 * 1000);
              - //time before 50 sec
                   long before50Secs = now - (50 * 1000);
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root + "tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - final Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              - "1");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              -
              -
              - Path rootPath = new Path(root);
              + int checkIntervalSeconds = 2;
              + int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
              +
              + Configuration conf = setupConfiguration(1800, 1);
              +
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
               
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
              - userDir);
              + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
              + ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
              +
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
               
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[] { userDirStatus });
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
              + before50Secs);
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
              + before50Secs);
              + PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
              + USER_ME, SUFFIX, appId1, before50Secs);
               
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
              - suffixDir);
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
               
              - ApplicationId appId1 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 1);
                   //Set time last modified of app1Dir directory and its files to before2000Secs
              - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId1, "me", suffix);
              - Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
              - remoteRootLogPath, "me", suffix, appId1);
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0,
              - 0, before50Secs, bucketDir);
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
              - app1Dir);
              -
              - ApplicationId appId2 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 2);
              - //Set time last modified of app1Dir directory and its files to before50Secs
              - Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId2, "me", suffix);
              - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
              - app2Dir);
              + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
              + USER_ME, SUFFIX, before2000Secs);
               
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixStatus });
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus });
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus, app2DirStatus });
              -
              - Path app1Log1 = new Path(app1Dir, "host1");
              - FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
              - app1Log1);
              + //Set time last modified of app1Dir directory and its files to before50Secs
              + PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
              + USER_ME, SUFFIX, before50Secs);
               
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[] { app1Log1Status });
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
              + app2.fileStatus});
               
              - Path app2Log1 = new Path(app2Dir, "host1");
              - FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
              - app2Log1);
              + PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
              + before2000Secs);
              + PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
              + before50Secs);
               
              - when(mockFs.listStatus(app2Dir)).thenReturn(
              - new FileStatus[] { app2Log1Status });
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
              + when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
               
                   final List<ApplicationId> finishedApplications =
                       Collections.unmodifiableList(Arrays.asList(appId1, appId2));
               
              - AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
              - @Override
              - protected Configuration createConf() {
              - return conf;
              - }
              - @Override
              - protected ApplicationClientProtocol createRMClient()
              - throws IOException {
              - try {
              - return createMockRMClient(finishedApplications, null);
              - } catch (Exception e) {
              - throw new IOException(e);
              - }
              - }
              - @Override
              - protected void stopRMClient() {
              - // DO NOTHING
              - }
              - };
              + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
              + finishedApplications, conf);
                   
                   deletionSvc.init(conf);
                   deletionSvc.start();
                   
                   //app1Dir would be deleted since its done above log retention period
              - verify(mockFs, timeout(10000)).delete(app1Dir, true);
              - //app2Dir is not expected to be deleted since its below the threshold
              - verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
              -
              - //Now,lets change the confs
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              - "2");
              + verify(mockFs, timeout(10000)).delete(app1.path, true);
              + //app2Dir is not expected to be deleted since it is below the threshold
              + verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
              +
              + //Now, let's change the confs
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
              + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              + checkIntervalSeconds);
                   //We have not called refreshLogSettings,hence don't expect to see the changed conf values
              - Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
              + assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
                   
                   //refresh the log settings
                   deletionSvc.refreshLogRetentionSettings();
               
                   //Check interval time should reflect the new value
              - Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
              + Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
                   //app2Dir should be deleted since it falls above the threshold
              - verify(mockFs, timeout(10000)).delete(app2Dir, true);
              + verify(mockFs, timeout(10000)).delete(app2.path, true);
                   deletionSvc.stop();
                 }
                 
                 @Test
                 public void testCheckInterval() throws Exception {
              - long RETENTION_SECS = 10 * 24 * 3600;
                   long now = System.currentTimeMillis();
              - long toDeleteTime = now - RETENTION_SECS*1000;
              -
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root+"tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              + long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
               
              + Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
               
                   // prevent us from picking up the same mockfs instance from another test
                   FileSystem.closeAll();
              - Path rootPath = new Path(root);
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
               
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
               
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[]{userDirStatus});
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
               
              - ApplicationId appId1 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 1);
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
              - suffixDir);
              - Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
              - remoteRootLogPath, "me", suffix, appId1);
              - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
              - remoteRootLogPath, appId1, "me", suffix);
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0,
              - 0, now, bucketDir);
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
               
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
              + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
              + PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
              + USER_ME, SUFFIX, appId1, now);
               
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixDirStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus});
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus});
              + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
              + USER_ME, SUFFIX, now);
              + PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
               
              - Path app1Log1 = new Path(app1Dir, "host1");
              - FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
               
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[]{app1Log1Status});
              + final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
               
              - final List<ApplicationId> finishedApplications =
              - Collections.unmodifiableList(Arrays.asList(appId1));
              -
              - AggregatedLogDeletionService deletionSvc =
              - new AggregatedLogDeletionService() {
              - @Override
              - protected ApplicationClientProtocol createRMClient()
              - throws IOException {
              - try {
              - return createMockRMClient(finishedApplications, null);
              - } catch (Exception e) {
              - throw new IOException(e);
              - }
              - }
              - @Override
              - protected void stopRMClient() {
              - // DO NOTHING
              - }
              - };
              + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
              + finishedApplications);
                   deletionSvc.init(conf);
                   deletionSvc.start();
                
                   verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
              - verify(mockFs, never()).delete(app1Dir, true);
              -
              - // modify the timestamp of the logs and verify it's picked up quickly
              - bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
              - app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
              - app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[] {suffixDirStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[] {bucketDirStatus });
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[] {app1DirStatus });
              - when(mockFs.listStatus(app1Dir)).thenReturn(
              - new FileStatus[]{app1Log1Status});
              -
              - verify(mockFs, timeout(10000)).delete(app1Dir, true);
              + verify(mockFs, never()).delete(app1.path, true);
              +
              + // modify the timestamp of the logs and verify if it's picked up quickly
              + app1.changeModificationTime(toDeleteTime);
              + app1Log1.changeModificationTime(toDeleteTime);
              + bucketDir.changeModificationTime(toDeleteTime);
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
              + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
              + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
              +
              + verify(mockFs, timeout(10000)).delete(app1.path, true);
               
                   deletionSvc.stop();
                 }
               
                 @Test
                 public void testRobustLogDeletion() throws Exception {
              - final long RETENTION_SECS = 10 * 24 * 3600;
              -
              - String root = "mockfs://foo/";
              - String remoteRootLogDir = root+"tmp/logs";
              - String suffix = "logs";
              - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
              - Configuration conf = new Configuration();
              - conf.setClass("fs.mockfs.impl", MockFileSystem.class,
              - FileSystem.class);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
              - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
              - "1");
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
              - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
              - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
              - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
              - LogAggregationTFileController.class.getName());
              + Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
               
                   // prevent us from picking up the same mockfs instance from another test
                   FileSystem.closeAll();
              - Path rootPath = new Path(root);
              + Path rootPath = new Path(ROOT);
                   FileSystem rootFs = rootPath.getFileSystem(conf);
                   FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
               
              - Path remoteRootLogPath = new Path(remoteRootLogDir);
              -
              - Path userDir = new Path(remoteRootLogPath, "me");
              - Path suffixDir = new Path(userDir, newSuffix);
              - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
              - FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
              - Path bucketDir = new Path(suffixDir, String.valueOf(0));
              - FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
              -
              - when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
              - new FileStatus[]{userDirStatus});
              - when(mockFs.listStatus(userDir)).thenReturn(
              - new FileStatus[]{suffixStatus});
              - when(mockFs.listStatus(suffixDir)).thenReturn(
              - new FileStatus[]{bucketDirStatus});
              -
              - ApplicationId appId1 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 1);
              - Path app1Dir = new Path(bucketDir, appId1.toString());
              - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
              - ApplicationId appId2 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 2);
              - Path app2Dir = new Path(bucketDir, "application_a");
              - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
              - ApplicationId appId3 =
              - ApplicationId.newInstance(System.currentTimeMillis(), 3);
              - Path app3Dir = new Path(bucketDir, appId3.toString());
              - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
              -
              - when(mockFs.listStatus(bucketDir)).thenReturn(
              - new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
              - when(mockFs.listStatus(app2Dir)).thenReturn(
              - new FileStatus[]{});
              -
              - when(mockFs.listStatus(app1Dir)).thenThrow(
              - new RuntimeException("Should Be Caught and Logged"));
              - Path app3Log3 = new Path(app3Dir, "host1");
              - FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
              - when(mockFs.listStatus(app3Dir)).thenReturn(
              - new FileStatus[]{app3Log3Status});
              + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
               
              - final List<ApplicationId> finishedApplications =
              - Collections.unmodifiableList(Arrays.asList(appId1, appId3));
              + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
              + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
              + PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
              +
              + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
              + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
              + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
               
              - ApplicationClientProtocol rmClient =
              - createMockRMClient(finishedApplications, null);
              + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
              + ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
              + ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
              +
              + PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
              + PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);

              Review Comment:
                 This was the same as in the original code, so I didn't want to alter the behaviour.



            githubbot ASF GitHub Bot logged work - 14/Jun/22 14:15
            githubbot ASF GitHub Bot logged work - 14/Jun/22 14:15

            People

              snemeth Szilard Nemeth
              snemeth Szilard Nemeth
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 10m
                  1h 10m