Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
3.4.0
-
Reviewed
Description
The code of TestAggregatedLogDeletionService is quite messy.
Some refactor could be performed on this code to make it more readable and easier to understand.
Attachments
Issue Links
- Dependency
-
YARN-11182 Refactor TestAggregatedLogDeletionService: 2nd phase
- Resolved
- is a clone of
-
YARN-11175 Refactor LogAggregationFileControllerFactory
- Resolved
- links to
Activity
ASF GitHub Bot
logged work - 11/Jun/22 15:06
-
- Time Spent:
- 10m
-
szilard-nemeth opened a new pull request, #4430:
URL: https://github.com/apache/hadoop/pull/4430
…s to variables<!--
Thanks for sending a pull request!
1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/HADOOP/How+To+Contribute
2. Make sure your PR title starts with JIRA issue id, e.g., 'HADOOP-17799. Your PR title ...'.
-->
### Description of PR
### How was this patch tested?
### For code changes:
- [ ] Does the title or this PR starts with the corresponding JIRA issue id (e.g. 'HADOOP-17799. Your PR title ...')?
- [ ] Object storage: have the integration tests been executed and the endpoint declared according to the connector-specific documentation?
- [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the `LICENSE`, `LICENSE-binary`, `NOTICE-binary` files?
ASF GitHub Bot
logged work - 11/Jun/22 16:46
-
- Time Spent:
- 10m
-
hadoop-yetus commented on PR #4430:
URL: https://github.com/apache/hadoop/pull/4430#issuecomment-1152962260
:broken_heart: **-1 overall**
| Vote | Subsystem | Runtime | Logfile | Comment |
|:----:|----------:|--------:|:--------:|:-------:|
| +0 :ok: | reexec | 0m 46s | | Docker mode activated. |
|||| _ Prechecks _ |
| +1 :green_heart: | dupname | 0m 1s | | No case conflicting files found. |
| +0 :ok: | codespell | 0m 0s | | codespell was not available. |
| +0 :ok: | detsecrets | 0m 0s | | detect-secrets was not available. |
| +1 :green_heart: | @author | 0m 0s | | The patch does not contain any @author tags. |
| +1 :green_heart: | test4tests | 0m 0s | | The patch appears to include 1 new or modified test files. |
|||| _ trunk Compile Tests _ |
| +1 :green_heart: | mvninstall | 37m 55s | | trunk passed |
| +1 :green_heart: | compile | 0m 47s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | compile | 0m 41s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | checkstyle | 0m 37s | | trunk passed |
| +1 :green_heart: | mvnsite | 0m 48s | | trunk passed |
| +1 :green_heart: | javadoc | 0m 58s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | javadoc | 0m 43s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | spotbugs | 1m 52s | | trunk passed |
| +1 :green_heart: | shadedclient | 21m 2s | | branch has no errors when building and testing our client artifacts. |
|||| _ Patch Compile Tests _ |
| +1 :green_heart: | mvninstall | 0m 40s | | the patch passed |
| +1 :green_heart: | compile | 0m 43s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | javac | 0m 43s | | the patch passed |
| +1 :green_heart: | compile | 0m 37s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | javac | 0m 37s | | the patch passed |
| -1 :x: | blanks | 0m 0s | [/blanks-eol.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/artifact/out/blanks-eol.txt) | The patch has 6 line(s) that end in blanks. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply |
| -0 :warning: | checkstyle | 0m 24s | [/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/artifact/out/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt) | hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common: The patch generated 47 new + 1 unchanged - 11 fixed = 48 total (was 12) |
| +1 :green_heart: | mvnsite | 0m 40s | | the patch passed |
| +1 :green_heart: | javadoc | 0m 38s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | javadoc | 0m 35s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | spotbugs | 1m 44s | | the patch passed |
| +1 :green_heart: | shadedclient | 21m 8s | | patch has no errors when building and testing our client artifacts. |
|||| _ Other Tests _ |
| +1 :green_heart: | unit | 4m 47s | | hadoop-yarn-common in the patch passed. |
| +1 :green_heart: | asflicense | 0m 35s | | The patch does not generate ASF License warnings. |
| | | 98m 31s | | |
| Subsystem | Report/Notes |
|----------:|:-------------|
| Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/artifact/out/Dockerfile |
| GITHUB PR | https://github.com/apache/hadoop/pull/4430 |
| Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell detsecrets |
| uname | Linux 0a0262b033c7 4.15.0-169-generic #177-Ubuntu SMP Thu Feb 3 10:50:38 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux |
| Build tool | maven |
| Personality | dev-support/bin/hadoop.sh |
| git revision | trunk / 0ed1689e94253b84d3fef968d41d9d6124e121a1 |
| Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/testReport/ |
| Max. process+thread count | 668 (vs. ulimit of 5500) |
| modules | C: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common U: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common |
| Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/1/console |
| versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
| Powered by | Apache Yetus 0.14.0 https://yetus.apache.org |
This message was automatically generated.
ASF GitHub Bot
logged work - 13/Jun/22 13:49
-
- Time Spent:
- 10m
-
hadoop-yetus commented on PR #4430:
URL: https://github.com/apache/hadoop/pull/4430#issuecomment-1153940178
:broken_heart: **-1 overall**
| Vote | Subsystem | Runtime | Logfile | Comment |
|:----:|----------:|--------:|:--------:|:-------:|
| +0 :ok: | reexec | 0m 36s | | Docker mode activated. |
|||| _ Prechecks _ |
| +1 :green_heart: | dupname | 0m 0s | | No case conflicting files found. |
| +0 :ok: | codespell | 0m 1s | | codespell was not available. |
| +0 :ok: | detsecrets | 0m 1s | | detect-secrets was not available. |
| +1 :green_heart: | @author | 0m 0s | | The patch does not contain any @author tags. |
| +1 :green_heart: | test4tests | 0m 0s | | The patch appears to include 1 new or modified test files. |
|||| _ trunk Compile Tests _ |
| +1 :green_heart: | mvninstall | 36m 47s | | trunk passed |
| +1 :green_heart: | compile | 1m 5s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | compile | 1m 1s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | checkstyle | 0m 58s | | trunk passed |
| +1 :green_heart: | mvnsite | 1m 6s | | trunk passed |
| +1 :green_heart: | javadoc | 1m 14s | | trunk passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | javadoc | 1m 4s | | trunk passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | spotbugs | 2m 6s | | trunk passed |
| +1 :green_heart: | shadedclient | 21m 14s | | branch has no errors when building and testing our client artifacts. |
|||| _ Patch Compile Tests _ |
| +1 :green_heart: | mvninstall | 0m 43s | | the patch passed |
| +1 :green_heart: | compile | 0m 48s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | javac | 0m 48s | | the patch passed |
| +1 :green_heart: | compile | 0m 44s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | javac | 0m 44s | | the patch passed |
| -1 :x: | blanks | 0m 0s | [/blanks-eol.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/artifact/out/blanks-eol.txt) | The patch has 5 line(s) that end in blanks. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply |
| -0 :warning: | checkstyle | 0m 33s | [/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/artifact/out/results-checkstyle-hadoop-yarn-project_hadoop-yarn_hadoop-yarn-common.txt) | hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common: The patch generated 11 new + 1 unchanged - 11 fixed = 12 total (was 12) |
| +1 :green_heart: | mvnsite | 0m 46s | | the patch passed |
| +1 :green_heart: | javadoc | 0m 47s | | the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 |
| +1 :green_heart: | javadoc | 0m 45s | | the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| +1 :green_heart: | spotbugs | 1m 45s | | the patch passed |
| +1 :green_heart: | shadedclient | 20m 48s | | patch has no errors when building and testing our client artifacts. |
|||| _ Other Tests _ |
| +1 :green_heart: | unit | 4m 59s | | hadoop-yarn-common in the patch passed. |
| +1 :green_heart: | asflicense | 0m 50s | | The patch does not generate ASF License warnings. |
| | | 100m 57s | | |
| Subsystem | Report/Notes |
|----------:|:-------------|
| Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/artifact/out/Dockerfile |
| GITHUB PR | https://github.com/apache/hadoop/pull/4430 |
| Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell detsecrets |
| uname | Linux 5c3a5eda045f 4.15.0-58-generic #64-Ubuntu SMP Tue Aug 6 11:12:41 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux |
| Build tool | maven |
| Personality | dev-support/bin/hadoop.sh |
| git revision | trunk / 899c014e5dfe1213887dc83f3a2a43d5beea601d |
| Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
| Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/testReport/ |
| Max. process+thread count | 652 (vs. ulimit of 5500) |
| modules | C: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common U: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common |
| Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4430/2/console |
| versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
| Powered by | Apache Yetus 0.14.0 https://yetus.apache.org |
This message was automatically generated.
ASF GitHub Bot
logged work - 14/Jun/22 11:58
-
- Time Spent:
- 10m
-
9uapaw commented on code in PR #4430:
URL: https://github.com/apache/hadoop/pull/4430#discussion_r896715900
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java:
##########
@@ -44,514 +44,391 @@
import org.junit.Test;
import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
public class TestAggregatedLogDeletionService {
-
+
+ private static final String T_FILE = "TFile";
+ private static final String USER_ME = "me";
+ private static final String DIR_HOST1 = "host1";
+ private static final String DIR_HOST2 = "host2";
+
+ private static final String ROOT = "mockfs://foo/";
+ private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
+ private static final String SUFFIX = "logs";
+ private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
+ private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
+
+ private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
+ ApplicationId appId,
+ String user, String suffix,
+ long modificationTime) {
+ Path path = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogDir, appId, user, suffix);
+ FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
+ return new PathWithFileStatus(path, fileStatus);
+ }
+
+ private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
+ return new FileStatus(0, true, 0, 0, modificationTime, path);
+ }
+
+ private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
+ long modificationTime) {
+ Path logPath = new Path(baseDir, childDir);
+ FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
+ return new PathWithFileStatus(logPath, fStatus);
+ }
+
+ private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
+ long modificationTime) {
+ Path logPath = new Path(baseDir, childDir);
+ FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
+ return new PathWithFileStatus(logPath, fStatus);
+ }
+
+ private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
+ String user,
+ String suffix,
+ ApplicationId appId,
+ long modificationTime) {
+ Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
+ FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
+ return new PathWithFileStatus(bucketDir, fStatus);
+ }
+
+ private static FileStatus createFileStatusWithLengthForFile(long length,
+ long modificationTime,
+ Path logPath) {
+ return new FileStatus(length, false, 1, 1, modificationTime, logPath);
+ }
+
+ private static FileStatus createFileStatusWithLengthForDir(long length,
+ long modificationTime,
+ Path logPath) {
+ return new FileStatus(length, true, 1, 1, modificationTime, logPath);
+ }
+
@Before
public void closeFilesystems() throws IOException {
// prevent the same mockfs instance from being reused due to FS cache
FileSystem.closeAll();
}
+ private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
+ Configuration conf = new Configuration();
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ retainCheckIntervalSeconds);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
+ conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
+ LogAggregationTFileController.class.getName());
+ return conf;
+ }
+
@Test
public void testDeletion() throws Exception {
long now = System.currentTimeMillis();
- long toDeleteTime = now - (2000*1000);
- long toKeepTime = now - (1500*1000);
- String root = "mockfs://foo/";
- String remoteRootLogDir = root+"tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- final Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
+ long toDeleteTime = now - (2000 * 1000);
+ long toKeepTime = now - (1500 * 1000);
+ Configuration conf = setupConfiguration(1800, -1);
- Path rootPath = new Path(root);
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir);
-
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[]{userDirStatus});
-
- ApplicationId appId1 =
- ApplicationId.newInstance(now, 1);
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus suffixDirStatus = new FileStatus(0, true,
- 0, 0, toDeleteTime, suffixDir);
- Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
- remoteRootLogPath, "me", suffix, appId1);
- FileStatus bucketDirStatus = new FileStatus(0, true, 0,
- 0, toDeleteTime, bucketDir);
- Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId1, "me", suffix);
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
- toDeleteTime, app1Dir);
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+ toKeepTime);
+
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
+
+ ApplicationId appId1 = ApplicationId.newInstance(now, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(now, 2);
+ ApplicationId appId3 = ApplicationId.newInstance(now, 3);
+ ApplicationId appId4 = ApplicationId.newInstance(now, 4);
+
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+ toDeleteTime);
+ PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
+ toDeleteTime);
+
+ PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+ USER_ME, SUFFIX, toDeleteTime);
+ PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+ USER_ME, SUFFIX, toDeleteTime);
+ PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
+ USER_ME, SUFFIX, toDeleteTime);
+ PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
+ USER_ME, SUFFIX, toDeleteTime);
+
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
+ app1.fileStatus, app2.fileStatus,
+ app3.fileStatus, app4.fileStatus});
- ApplicationId appId2 =
- ApplicationId.newInstance(now, 2);
- Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId2, "me", suffix);
- FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
- toDeleteTime, app2Dir);
-
- ApplicationId appId3 =
- ApplicationId.newInstance(now, 3);
- Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId3, "me", suffix);
- FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
- toDeleteTime, app3Dir);
-
- ApplicationId appId4 =
- ApplicationId.newInstance(now, 4);
- Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId4, "me", suffix);
- FileStatus app4DirStatus =
- new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
-
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixDirStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus});
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus, app2DirStatus,
- app3DirStatus, app4DirStatus});
-
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[]{});
-
-
- Path app2Log1 = new Path(app2Dir, "host1");
- FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
-
- Path app2Log2 = new Path(app2Dir, "host2");
- FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
-
- when(mockFs.listStatus(app2Dir)).thenReturn(
- new FileStatus[]{app2Log1Status, app2Log2Status});
-
- Path app3Log1 = new Path(app3Dir, "host1");
- FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
-
- Path app3Log2 = new Path(app3Dir, "host2");
- FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
-
- when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
-
- when(mockFs.listStatus(app3Dir)).thenReturn(
- new FileStatus[]{app3Log1Status, app3Log2Status});
-
- Path app4Log1 = new Path(app4Dir, "host1");
- FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
-
- Path app4Log2 = new Path(app4Dir, "host2");
- FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
- toKeepTime, app4Log2);
-
- when(mockFs.listStatus(app4Dir)).thenReturn(
- new FileStatus[]{app4Log1Status, app4Log2Status});
-
- final List<ApplicationId> finishedApplications =
- Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
- final List<ApplicationId> runningApplications =
- Collections.unmodifiableList(Arrays.asList(appId4));
+ PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+ toDeleteTime);
+ PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2,
+ toKeepTime);
+ PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
+ toDeleteTime);
+ PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
+ toDeleteTime);
+ PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
+ toDeleteTime);
+ PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
+
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
+ when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
+ app2Log2.fileStatus});
+ when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
+ app3Log2.fileStatus});
+ when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
+ app4Log2.fileStatus});
+ when(mockFs.delete(app3.path, true)).thenThrow(
+ new AccessControlException("Injected Error\nStack Trace :("));
+
+ final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
+ Arrays.asList(appId1, appId2, appId3));
+ final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
AggregatedLogDeletionService deletionService =
- new AggregatedLogDeletionService() {
- @Override
- protected ApplicationClientProtocol createRMClient()
- throws IOException {
- try {
- return createMockRMClient(finishedApplications,
- runningApplications);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- @Override
- protected void stopRMClient() {
- // DO NOTHING
- }
- };
+ new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
deletionService.init(conf);
deletionService.start();
- verify(mockFs, timeout(2000)).delete(app1Dir, true);
- verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
- verify(mockFs, timeout(2000)).delete(app3Dir, true);
- verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
- verify(mockFs, timeout(2000)).delete(app4Log1, true);
- verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
+ int timeout = 2000;
+ verify(mockFs, timeout(timeout)).delete(app1.path, true);
+ verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
+ verify(mockFs, timeout(timeout)).delete(app3.path, true);
+ verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
+ verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
+ verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
deletionService.stop();
}
@Test
public void testRefreshLogRetentionSettings() throws Exception {
long now = System.currentTimeMillis();
- //time before 2000 sec
long before2000Secs = now - (2000 * 1000);
- //time before 50 sec
long before50Secs = now - (50 * 1000);
- String root = "mockfs://foo/";
- String remoteRootLogDir = root + "tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- final Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
- "1");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
-
-
- Path rootPath = new Path(root);
+ int checkIntervalSeconds = 2;
+ int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
+
+ Configuration conf = setupConfiguration(1800, 1);
+
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
- userDir);
+ ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
+
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[] { userDirStatus });
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+ before50Secs);
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+ before50Secs);
+ PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+ USER_ME, SUFFIX, appId1, before50Secs);
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
- suffixDir);
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
- ApplicationId appId1 =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs
- Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId1, "me", suffix);
- Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
- remoteRootLogPath, "me", suffix, appId1);
- FileStatus bucketDirStatus = new FileStatus(0, true, 0,
- 0, before50Secs, bucketDir);
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
- app1Dir);
-
- ApplicationId appId2 =
- ApplicationId.newInstance(System.currentTimeMillis(), 2);
- //Set time last modified of app1Dir directory and its files to before50Secs
- Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId2, "me", suffix);
- FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
- app2Dir);
+ PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+ USER_ME, SUFFIX, before2000Secs);
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixStatus });
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus });
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus, app2DirStatus });
-
- Path app1Log1 = new Path(app1Dir, "host1");
- FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
- app1Log1);
+ //Set time last modified of app1Dir directory and its files to before50Secs
+ PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+ USER_ME, SUFFIX, before50Secs);
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[] { app1Log1Status });
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
+ app2.fileStatus});
- Path app2Log1 = new Path(app2Dir, "host1");
- FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
- app2Log1);
+ PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
+ before2000Secs);
+ PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+ before50Secs);
- when(mockFs.listStatus(app2Dir)).thenReturn(
- new FileStatus[] { app2Log1Status });
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
+ when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
- AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
- @Override
- protected Configuration createConf() {
- return conf;
- }
- @Override
- protected ApplicationClientProtocol createRMClient()
- throws IOException {
- try {
- return createMockRMClient(finishedApplications, null);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- @Override
- protected void stopRMClient() {
- // DO NOTHING
- }
- };
+ AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+ finishedApplications, conf);
deletionSvc.init(conf);
deletionSvc.start();
//app1Dir would be deleted since its done above log retention period
- verify(mockFs, timeout(10000)).delete(app1Dir, true);
- //app2Dir is not expected to be deleted since its below the threshold
- verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
-
- //Now,lets change the confs
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
- "2");
+ verify(mockFs, timeout(10000)).delete(app1.path, true);
+ //app2Dir is not expected to be deleted since it is below the threshold
+ verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
+
+ //Now, let's change the confs
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ checkIntervalSeconds);
//We have not called refreshLogSettings,hence don't expect to see the changed conf values
- Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+ assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
//refresh the log settings
deletionSvc.refreshLogRetentionSettings();
//Check interval time should reflect the new value
- Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+ Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
//app2Dir should be deleted since it falls above the threshold
- verify(mockFs, timeout(10000)).delete(app2Dir, true);
+ verify(mockFs, timeout(10000)).delete(app2.path, true);
deletionSvc.stop();
}
@Test
public void testCheckInterval() throws Exception {
- long RETENTION_SECS = 10 * 24 * 3600;
long now = System.currentTimeMillis();
- long toDeleteTime = now - RETENTION_SECS*1000;
-
- String root = "mockfs://foo/";
- String remoteRootLogDir = root+"tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
+ long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
+ Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
- Path rootPath = new Path(root);
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[]{userDirStatus});
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
- ApplicationId appId1 =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
- suffixDir);
- Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
- remoteRootLogPath, "me", suffix, appId1);
- Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId1, "me", suffix);
- FileStatus bucketDirStatus = new FileStatus(0, true, 0,
- 0, now, bucketDir);
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
+ ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+ USER_ME, SUFFIX, appId1, now);
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixDirStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus});
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus});
+ PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+ USER_ME, SUFFIX, now);
+ PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
- Path app1Log1 = new Path(app1Dir, "host1");
- FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[]{app1Log1Status});
+ final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
- final List<ApplicationId> finishedApplications =
- Collections.unmodifiableList(Arrays.asList(appId1));
-
- AggregatedLogDeletionService deletionSvc =
- new AggregatedLogDeletionService() {
- @Override
- protected ApplicationClientProtocol createRMClient()
- throws IOException {
- try {
- return createMockRMClient(finishedApplications, null);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- @Override
- protected void stopRMClient() {
- // DO NOTHING
- }
- };
+ AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+ finishedApplications);
deletionSvc.init(conf);
deletionSvc.start();
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
- verify(mockFs, never()).delete(app1Dir, true);
-
- // modify the timestamp of the logs and verify it's picked up quickly
- bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
- app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
- app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixDirStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus });
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus });
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[]{app1Log1Status});
-
- verify(mockFs, timeout(10000)).delete(app1Dir, true);
+ verify(mockFs, never()).delete(app1.path, true);
+
+ // modify the timestamp of the logs and verify if it's picked up quickly
+ app1.changeModificationTime(toDeleteTime);
+ app1Log1.changeModificationTime(toDeleteTime);
+ bucketDir.changeModificationTime(toDeleteTime);
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
+
+ verify(mockFs, timeout(10000)).delete(app1.path, true);
deletionSvc.stop();
}
@Test
public void testRobustLogDeletion() throws Exception {
- final long RETENTION_SECS = 10 * 24 * 3600;
-
- String root = "mockfs://foo/";
- String remoteRootLogDir = root+"tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class,
- FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
- "1");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
+ Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
- Path rootPath = new Path(root);
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
- FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
- Path bucketDir = new Path(suffixDir, String.valueOf(0));
- FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
-
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[]{userDirStatus});
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[]{suffixStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[]{bucketDirStatus});
-
- ApplicationId appId1 =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- Path app1Dir = new Path(bucketDir, appId1.toString());
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
- ApplicationId appId2 =
- ApplicationId.newInstance(System.currentTimeMillis(), 2);
- Path app2Dir = new Path(bucketDir, "application_a");
- FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
- ApplicationId appId3 =
- ApplicationId.newInstance(System.currentTimeMillis(), 3);
- Path app3Dir = new Path(bucketDir, appId3.toString());
- FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
-
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
- when(mockFs.listStatus(app2Dir)).thenReturn(
- new FileStatus[]{});
-
- when(mockFs.listStatus(app1Dir)).thenThrow(
- new RuntimeException("Should Be Caught and Logged"));
- Path app3Log3 = new Path(app3Dir, "host1");
- FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
- when(mockFs.listStatus(app3Dir)).thenReturn(
- new FileStatus[]{app3Log3Status});
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
- final List<ApplicationId> finishedApplications =
- Collections.unmodifiableList(Arrays.asList(appId1, appId3));
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
+ PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
+
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
- ApplicationClientProtocol rmClient =
- createMockRMClient(finishedApplications, null);
+ ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
+ ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
+
+ PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
+ PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);
Review Comment:
Is this intentionally set as application_a instead of AppId2?
ASF GitHub Bot
logged work - 14/Jun/22 12:35
-
- Time Spent:
- 10m
-
szilard-nemeth commented on code in PR #4430:
URL: https://github.com/apache/hadoop/pull/4430#discussion_r896760612
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java:
##########
@@ -44,514 +44,391 @@
import org.junit.Test;
import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
public class TestAggregatedLogDeletionService {
-
+
+ private static final String T_FILE = "TFile";
+ private static final String USER_ME = "me";
+ private static final String DIR_HOST1 = "host1";
+ private static final String DIR_HOST2 = "host2";
+
+ private static final String ROOT = "mockfs://foo/";
+ private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
+ private static final String SUFFIX = "logs";
+ private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
+ private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
+
+ private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
+ ApplicationId appId,
+ String user, String suffix,
+ long modificationTime) {
+ Path path = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogDir, appId, user, suffix);
+ FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
+ return new PathWithFileStatus(path, fileStatus);
+ }
+
+ private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
+ return new FileStatus(0, true, 0, 0, modificationTime, path);
+ }
+
+ private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
+ long modificationTime) {
+ Path logPath = new Path(baseDir, childDir);
+ FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
+ return new PathWithFileStatus(logPath, fStatus);
+ }
+
+ private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
+ long modificationTime) {
+ Path logPath = new Path(baseDir, childDir);
+ FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
+ return new PathWithFileStatus(logPath, fStatus);
+ }
+
+ private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
+ String user,
+ String suffix,
+ ApplicationId appId,
+ long modificationTime) {
+ Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
+ FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
+ return new PathWithFileStatus(bucketDir, fStatus);
+ }
+
+ private static FileStatus createFileStatusWithLengthForFile(long length,
+ long modificationTime,
+ Path logPath) {
+ return new FileStatus(length, false, 1, 1, modificationTime, logPath);
+ }
+
+ private static FileStatus createFileStatusWithLengthForDir(long length,
+ long modificationTime,
+ Path logPath) {
+ return new FileStatus(length, true, 1, 1, modificationTime, logPath);
+ }
+
@Before
public void closeFilesystems() throws IOException {
// prevent the same mockfs instance from being reused due to FS cache
FileSystem.closeAll();
}
+ private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
+ Configuration conf = new Configuration();
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ retainCheckIntervalSeconds);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
+ conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
+ LogAggregationTFileController.class.getName());
+ return conf;
+ }
+
@Test
public void testDeletion() throws Exception {
long now = System.currentTimeMillis();
- long toDeleteTime = now - (2000*1000);
- long toKeepTime = now - (1500*1000);
- String root = "mockfs://foo/";
- String remoteRootLogDir = root+"tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- final Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
+ long toDeleteTime = now - (2000 * 1000);
+ long toKeepTime = now - (1500 * 1000);
+ Configuration conf = setupConfiguration(1800, -1);
- Path rootPath = new Path(root);
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir);
-
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[]{userDirStatus});
-
- ApplicationId appId1 =
- ApplicationId.newInstance(now, 1);
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus suffixDirStatus = new FileStatus(0, true,
- 0, 0, toDeleteTime, suffixDir);
- Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
- remoteRootLogPath, "me", suffix, appId1);
- FileStatus bucketDirStatus = new FileStatus(0, true, 0,
- 0, toDeleteTime, bucketDir);
- Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId1, "me", suffix);
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
- toDeleteTime, app1Dir);
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+ toKeepTime);
+
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
+
+ ApplicationId appId1 = ApplicationId.newInstance(now, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(now, 2);
+ ApplicationId appId3 = ApplicationId.newInstance(now, 3);
+ ApplicationId appId4 = ApplicationId.newInstance(now, 4);
+
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+ toDeleteTime);
+ PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
+ toDeleteTime);
+
+ PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+ USER_ME, SUFFIX, toDeleteTime);
+ PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+ USER_ME, SUFFIX, toDeleteTime);
+ PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
+ USER_ME, SUFFIX, toDeleteTime);
+ PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
+ USER_ME, SUFFIX, toDeleteTime);
+
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
+ app1.fileStatus, app2.fileStatus,
+ app3.fileStatus, app4.fileStatus});
- ApplicationId appId2 =
- ApplicationId.newInstance(now, 2);
- Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId2, "me", suffix);
- FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
- toDeleteTime, app2Dir);
-
- ApplicationId appId3 =
- ApplicationId.newInstance(now, 3);
- Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId3, "me", suffix);
- FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
- toDeleteTime, app3Dir);
-
- ApplicationId appId4 =
- ApplicationId.newInstance(now, 4);
- Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId4, "me", suffix);
- FileStatus app4DirStatus =
- new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
-
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixDirStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus});
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus, app2DirStatus,
- app3DirStatus, app4DirStatus});
-
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[]{});
-
-
- Path app2Log1 = new Path(app2Dir, "host1");
- FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
-
- Path app2Log2 = new Path(app2Dir, "host2");
- FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
-
- when(mockFs.listStatus(app2Dir)).thenReturn(
- new FileStatus[]{app2Log1Status, app2Log2Status});
-
- Path app3Log1 = new Path(app3Dir, "host1");
- FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
-
- Path app3Log2 = new Path(app3Dir, "host2");
- FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
-
- when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
-
- when(mockFs.listStatus(app3Dir)).thenReturn(
- new FileStatus[]{app3Log1Status, app3Log2Status});
-
- Path app4Log1 = new Path(app4Dir, "host1");
- FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
-
- Path app4Log2 = new Path(app4Dir, "host2");
- FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
- toKeepTime, app4Log2);
-
- when(mockFs.listStatus(app4Dir)).thenReturn(
- new FileStatus[]{app4Log1Status, app4Log2Status});
-
- final List<ApplicationId> finishedApplications =
- Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
- final List<ApplicationId> runningApplications =
- Collections.unmodifiableList(Arrays.asList(appId4));
+ PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+ toDeleteTime);
+ PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2,
+ toKeepTime);
+ PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
+ toDeleteTime);
+ PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
+ toDeleteTime);
+ PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
+ toDeleteTime);
+ PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
+
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
+ when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
+ app2Log2.fileStatus});
+ when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
+ app3Log2.fileStatus});
+ when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
+ app4Log2.fileStatus});
+ when(mockFs.delete(app3.path, true)).thenThrow(
+ new AccessControlException("Injected Error\nStack Trace :("));
+
+ final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
+ Arrays.asList(appId1, appId2, appId3));
+ final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
AggregatedLogDeletionService deletionService =
- new AggregatedLogDeletionService() {
- @Override
- protected ApplicationClientProtocol createRMClient()
- throws IOException {
- try {
- return createMockRMClient(finishedApplications,
- runningApplications);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- @Override
- protected void stopRMClient() {
- // DO NOTHING
- }
- };
+ new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
deletionService.init(conf);
deletionService.start();
- verify(mockFs, timeout(2000)).delete(app1Dir, true);
- verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
- verify(mockFs, timeout(2000)).delete(app3Dir, true);
- verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
- verify(mockFs, timeout(2000)).delete(app4Log1, true);
- verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
+ int timeout = 2000;
+ verify(mockFs, timeout(timeout)).delete(app1.path, true);
+ verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
+ verify(mockFs, timeout(timeout)).delete(app3.path, true);
+ verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
+ verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
+ verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
deletionService.stop();
}
@Test
public void testRefreshLogRetentionSettings() throws Exception {
long now = System.currentTimeMillis();
- //time before 2000 sec
long before2000Secs = now - (2000 * 1000);
- //time before 50 sec
long before50Secs = now - (50 * 1000);
- String root = "mockfs://foo/";
- String remoteRootLogDir = root + "tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- final Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
- "1");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
-
-
- Path rootPath = new Path(root);
+ int checkIntervalSeconds = 2;
+ int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
+
+ Configuration conf = setupConfiguration(1800, 1);
+
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
- userDir);
+ ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
+
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[] { userDirStatus });
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+ before50Secs);
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+ before50Secs);
+ PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+ USER_ME, SUFFIX, appId1, before50Secs);
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
- suffixDir);
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
- ApplicationId appId1 =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs
- Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId1, "me", suffix);
- Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
- remoteRootLogPath, "me", suffix, appId1);
- FileStatus bucketDirStatus = new FileStatus(0, true, 0,
- 0, before50Secs, bucketDir);
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
- app1Dir);
-
- ApplicationId appId2 =
- ApplicationId.newInstance(System.currentTimeMillis(), 2);
- //Set time last modified of app1Dir directory and its files to before50Secs
- Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId2, "me", suffix);
- FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
- app2Dir);
+ PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+ USER_ME, SUFFIX, before2000Secs);
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixStatus });
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus });
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus, app2DirStatus });
-
- Path app1Log1 = new Path(app1Dir, "host1");
- FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
- app1Log1);
+ //Set time last modified of app1Dir directory and its files to before50Secs
+ PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+ USER_ME, SUFFIX, before50Secs);
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[] { app1Log1Status });
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
+ app2.fileStatus});
- Path app2Log1 = new Path(app2Dir, "host1");
- FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
- app2Log1);
+ PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
+ before2000Secs);
+ PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+ before50Secs);
- when(mockFs.listStatus(app2Dir)).thenReturn(
- new FileStatus[] { app2Log1Status });
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
+ when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
- AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
- @Override
- protected Configuration createConf() {
- return conf;
- }
- @Override
- protected ApplicationClientProtocol createRMClient()
- throws IOException {
- try {
- return createMockRMClient(finishedApplications, null);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- @Override
- protected void stopRMClient() {
- // DO NOTHING
- }
- };
+ AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+ finishedApplications, conf);
deletionSvc.init(conf);
deletionSvc.start();
//app1Dir would be deleted since its done above log retention period
- verify(mockFs, timeout(10000)).delete(app1Dir, true);
- //app2Dir is not expected to be deleted since its below the threshold
- verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
-
- //Now,lets change the confs
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
- "2");
+ verify(mockFs, timeout(10000)).delete(app1.path, true);
+ //app2Dir is not expected to be deleted since it is below the threshold
+ verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
+
+ //Now, let's change the confs
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
+ conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ checkIntervalSeconds);
//We have not called refreshLogSettings,hence don't expect to see the changed conf values
- Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+ assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
//refresh the log settings
deletionSvc.refreshLogRetentionSettings();
//Check interval time should reflect the new value
- Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+ Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
//app2Dir should be deleted since it falls above the threshold
- verify(mockFs, timeout(10000)).delete(app2Dir, true);
+ verify(mockFs, timeout(10000)).delete(app2.path, true);
deletionSvc.stop();
}
@Test
public void testCheckInterval() throws Exception {
- long RETENTION_SECS = 10 * 24 * 3600;
long now = System.currentTimeMillis();
- long toDeleteTime = now - RETENTION_SECS*1000;
-
- String root = "mockfs://foo/";
- String remoteRootLogDir = root+"tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
+ long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
+ Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
- Path rootPath = new Path(root);
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[]{userDirStatus});
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
- ApplicationId appId1 =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
- suffixDir);
- Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
- remoteRootLogPath, "me", suffix, appId1);
- Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogPath, appId1, "me", suffix);
- FileStatus bucketDirStatus = new FileStatus(0, true, 0,
- 0, now, bucketDir);
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
+ ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+ USER_ME, SUFFIX, appId1, now);
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixDirStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus});
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus});
+ PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+ USER_ME, SUFFIX, now);
+ PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
- Path app1Log1 = new Path(app1Dir, "host1");
- FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[]{app1Log1Status});
+ final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
- final List<ApplicationId> finishedApplications =
- Collections.unmodifiableList(Arrays.asList(appId1));
-
- AggregatedLogDeletionService deletionSvc =
- new AggregatedLogDeletionService() {
- @Override
- protected ApplicationClientProtocol createRMClient()
- throws IOException {
- try {
- return createMockRMClient(finishedApplications, null);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- @Override
- protected void stopRMClient() {
- // DO NOTHING
- }
- };
+ AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+ finishedApplications);
deletionSvc.init(conf);
deletionSvc.start();
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
- verify(mockFs, never()).delete(app1Dir, true);
-
- // modify the timestamp of the logs and verify it's picked up quickly
- bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
- app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
- app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[] {suffixDirStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[] {bucketDirStatus });
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[] {app1DirStatus });
- when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[]{app1Log1Status});
-
- verify(mockFs, timeout(10000)).delete(app1Dir, true);
+ verify(mockFs, never()).delete(app1.path, true);
+
+ // modify the timestamp of the logs and verify if it's picked up quickly
+ app1.changeModificationTime(toDeleteTime);
+ app1Log1.changeModificationTime(toDeleteTime);
+ bucketDir.changeModificationTime(toDeleteTime);
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
+ when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
+ when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
+
+ verify(mockFs, timeout(10000)).delete(app1.path, true);
deletionSvc.stop();
}
@Test
public void testRobustLogDeletion() throws Exception {
- final long RETENTION_SECS = 10 * 24 * 3600;
-
- String root = "mockfs://foo/";
- String remoteRootLogDir = root+"tmp/logs";
- String suffix = "logs";
- String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
- Configuration conf = new Configuration();
- conf.setClass("fs.mockfs.impl", MockFileSystem.class,
- FileSystem.class);
- conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
- conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
- "1");
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
- conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
- conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
- LogAggregationTFileController.class.getName());
+ Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
- Path rootPath = new Path(root);
+ Path rootPath = new Path(ROOT);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
- Path remoteRootLogPath = new Path(remoteRootLogDir);
-
- Path userDir = new Path(remoteRootLogPath, "me");
- Path suffixDir = new Path(userDir, newSuffix);
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
- FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
- Path bucketDir = new Path(suffixDir, String.valueOf(0));
- FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
-
- when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[]{userDirStatus});
- when(mockFs.listStatus(userDir)).thenReturn(
- new FileStatus[]{suffixStatus});
- when(mockFs.listStatus(suffixDir)).thenReturn(
- new FileStatus[]{bucketDirStatus});
-
- ApplicationId appId1 =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- Path app1Dir = new Path(bucketDir, appId1.toString());
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
- ApplicationId appId2 =
- ApplicationId.newInstance(System.currentTimeMillis(), 2);
- Path app2Dir = new Path(bucketDir, "application_a");
- FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
- ApplicationId appId3 =
- ApplicationId.newInstance(System.currentTimeMillis(), 3);
- Path app3Dir = new Path(bucketDir, appId3.toString());
- FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
-
- when(mockFs.listStatus(bucketDir)).thenReturn(
- new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
- when(mockFs.listStatus(app2Dir)).thenReturn(
- new FileStatus[]{});
-
- when(mockFs.listStatus(app1Dir)).thenThrow(
- new RuntimeException("Should Be Caught and Logged"));
- Path app3Log3 = new Path(app3Dir, "host1");
- FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
- when(mockFs.listStatus(app3Dir)).thenReturn(
- new FileStatus[]{app3Log3Status});
+ Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
- final List<ApplicationId> finishedApplications =
- Collections.unmodifiableList(Arrays.asList(appId1, appId3));
+ PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
+ PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
+ PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
+
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
+ when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+ when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
- ApplicationClientProtocol rmClient =
- createMockRMClient(finishedApplications, null);
+ ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
+ ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
+
+ PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
+ PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);
Review Comment:
This was the same as in the original code, so I didn't want to alter the behaviour.
ASF GitHub Bot
logged work - 14/Jun/22 14:15
-
- Time Spent:
- 10m
-
9uapaw commented on PR #4430:
URL: https://github.com/apache/hadoop/pull/4430#issuecomment-1155247802
Thanks for the change @szilard-nemeth. Committed to trunk.
ASF GitHub Bot
logged work - 14/Jun/22 14:15
-
- Time Spent:
- 10m
-
9uapaw closed pull request #4430:
YARN-11176. Refactor TestAggregatedLogDeletionService
URL: https://github.com/apache/hadoop/pull/4430