-
Notifications
You must be signed in to change notification settings - Fork 600
HDDS-14370. RandomAccessFileChannel to implement Closeable #9905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
3f817aa
b2442ea
2093356
d3b025c
a23fb2d
1b2b862
9f5277d
5ee3220
c377bf3
2308b3b
ba747d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.hadoop.hdds.utils.io; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.File; | ||
| import java.io.FileNotFoundException; | ||
| import java.io.IOException; | ||
|
|
@@ -29,7 +30,7 @@ | |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** {@link RandomAccessFile} and its {@link FileChannel}. */ | ||
| public class RandomAccessFileChannel { | ||
| public class RandomAccessFileChannel implements Closeable { | ||
| private static final Logger LOG = LoggerFactory.getLogger(RandomAccessFileChannel.class); | ||
|
|
||
| private File blockFile; | ||
|
|
@@ -47,9 +48,21 @@ public synchronized boolean isOpen() { | |
| /** Open the given file in read-only mode. */ | ||
| public synchronized void open(File file) throws FileNotFoundException { | ||
| Preconditions.assertNull(blockFile, "blockFile"); | ||
| blockFile = Objects.requireNonNull(file, "blockFile == null"); | ||
| raf = new RandomAccessFile(blockFile, "r"); | ||
| channel = raf.getChannel(); | ||
| final File f = Objects.requireNonNull(file, "blockFile == null"); | ||
| final RandomAccessFile newRaf = new RandomAccessFile(f, "r"); | ||
| try { | ||
| final FileChannel newChannel = newRaf.getChannel(); | ||
| blockFile = f; | ||
| raf = newRaf; | ||
| channel = newChannel; | ||
| } catch (Exception e) { | ||
| try { | ||
| newRaf.close(); | ||
| } catch (IOException closeEx) { | ||
| e.addSuppressed(closeEx); | ||
| } | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| /** Similar to {@link FileChannel#position(long)}. */ | ||
|
|
@@ -67,7 +80,7 @@ public synchronized void position(long newPosition) throws IOException { | |
| * this method tries to fill up the buffer until either | ||
| * (1) the buffer is full, or (2) it has reached end-of-stream. | ||
| * | ||
| * @return ture if the caller should continue to read; | ||
| * @return true if the caller should continue to read; | ||
| * otherwise, it has reached end-of-stream, return false; | ||
| */ | ||
| public synchronized boolean read(ByteBuffer buffer) throws IOException { | ||
|
|
@@ -86,22 +99,31 @@ public synchronized boolean read(ByteBuffer buffer) throws IOException { | |
| * In case of exception, this method catches the exception, logs a warning message, | ||
| * and then continue closing the remaining resources. | ||
| */ | ||
| @Override | ||
| public synchronized void close() { | ||
| if (blockFile == null) { | ||
| final File fileToClose = blockFile; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why add a new variable? doesn't the original logic work?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I think it fixes an existing logging bug. In the original code, blockFile was set to null before closing the channel, so if an IOException occurred, the log incorrectly printed null instead of the file name. |
||
| if (fileToClose == null) { | ||
| return; | ||
| } | ||
| blockFile = null; | ||
|
|
||
| try { | ||
| channel.close(); | ||
| channel = null; | ||
| if (channel != null) { | ||
| channel.close(); | ||
| } | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to close channel for {}", blockFile, e); | ||
| LOG.warn("Failed to close channel for {}", fileToClose, e); | ||
| } finally { | ||
| channel = null; | ||
| } | ||
| try { | ||
| raf.close(); | ||
| raf = null; | ||
| if (raf != null) { | ||
| raf.close(); | ||
| } | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to close RandomAccessFile for {}", blockFile, e); | ||
| LOG.warn("Failed to close RandomAccessFile for {}", fileToClose, e); | ||
| } finally { | ||
| raf = null; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,153 @@ | ||||||||||||
| /* | ||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||
| * | ||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||
| * | ||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||
| * limitations under the License. | ||||||||||||
| */ | ||||||||||||
|
|
||||||||||||
| package org.apache.hadoop.hdds.utils.io; | ||||||||||||
|
|
||||||||||||
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||||||||||||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||||||||||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||||||||||||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||||||||||||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||||||||||||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||||||||||||
| import static org.mockito.Mockito.doThrow; | ||||||||||||
| import static org.mockito.Mockito.mock; | ||||||||||||
| import static org.mockito.Mockito.spy; | ||||||||||||
| import static org.mockito.Mockito.verify; | ||||||||||||
|
|
||||||||||||
| import java.io.Closeable; | ||||||||||||
| import java.io.File; | ||||||||||||
| import java.io.FileNotFoundException; | ||||||||||||
| import java.io.IOException; | ||||||||||||
| import java.io.RandomAccessFile; | ||||||||||||
| import java.lang.reflect.Field; | ||||||||||||
| import java.nio.ByteBuffer; | ||||||||||||
| import java.nio.channels.FileChannel; | ||||||||||||
| import java.nio.file.Path; | ||||||||||||
| import org.junit.jupiter.api.Test; | ||||||||||||
| import org.junit.jupiter.api.io.TempDir; | ||||||||||||
|
|
||||||||||||
| class TestRandomAccessFileChannel { | ||||||||||||
| @TempDir | ||||||||||||
| private Path tempDir; | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| void openFailureDoesNotLeaveOpenAndCloseIsSafe() { | ||||||||||||
| final RandomAccessFileChannel c = new RandomAccessFileChannel(); | ||||||||||||
| final File missing = tempDir.resolve("missing-file").toFile(); | ||||||||||||
|
|
||||||||||||
| assertThrows(FileNotFoundException.class, () -> c.open(missing)); | ||||||||||||
| assertFalse(c.isOpen()); | ||||||||||||
| assertDoesNotThrow(c::close); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| void closeIsIdempotent() throws Exception { | ||||||||||||
| final RandomAccessFileChannel c = new RandomAccessFileChannel(); | ||||||||||||
| final File f = tempDir.resolve("file").toFile(); | ||||||||||||
| try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { | ||||||||||||
| raf.write(1); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| c.open(f); | ||||||||||||
| assertTrue(c.isOpen()); | ||||||||||||
|
|
||||||||||||
| assertDoesNotThrow(c::close); | ||||||||||||
| assertFalse(c.isOpen()); | ||||||||||||
| assertDoesNotThrow(c::close); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| void closeContinuesToCloseRafEvenIfChannelCloseFails() throws Exception { | ||||||||||||
| final RandomAccessFileChannel c = new RandomAccessFileChannel(); | ||||||||||||
| final File f = tempDir.resolve("file-to-close").toFile(); | ||||||||||||
| try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { | ||||||||||||
| raf.write(1); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| final FileChannel failingChannel = mock(FileChannel.class); | ||||||||||||
| doThrow(new IOException("simulated close failure")).when(failingChannel).close(); | ||||||||||||
| final RandomAccessFile spyRaf = spy(new RandomAccessFile(f, "rw")); | ||||||||||||
| setField(c, "blockFile", f); | ||||||||||||
| setField(c, "channel", failingChannel); | ||||||||||||
| setField(c, "raf", spyRaf); | ||||||||||||
|
|
||||||||||||
| assertDoesNotThrow(c::close); | ||||||||||||
| verify(failingChannel).close(); | ||||||||||||
| verify(spyRaf).close(); | ||||||||||||
|
Comment on lines
+88
to
+90
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Optional: We can add |
||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| void closeDoesNotThrowWhenRafAndChannelAreNull() throws Exception { | ||||||||||||
| final RandomAccessFileChannel c = new RandomAccessFileChannel(); | ||||||||||||
| setField(c, "blockFile", tempDir.resolve("dummy").toFile()); | ||||||||||||
| setField(c, "channel", null); | ||||||||||||
| setField(c, "raf", null); | ||||||||||||
| assertDoesNotThrow(c::close); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| void readWithZeroSizedBuffer() throws Exception { | ||||||||||||
| final RandomAccessFileChannel c = new RandomAccessFileChannel(); | ||||||||||||
| final File f = tempDir.resolve("test-file").toFile(); | ||||||||||||
| try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { | ||||||||||||
| raf.write(new byte[]{1, 2, 3, 4, 5}); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| c.open(f); | ||||||||||||
|
Comment on lines
+102
to
+110
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should also have c.close() |
||||||||||||
| assertTrue(c.isOpen()); | ||||||||||||
|
|
||||||||||||
| final ByteBuffer zeroSizedBuffer = ByteBuffer.allocate(0); | ||||||||||||
| // Should return immediately without reading (buffer has no remaining capacity) | ||||||||||||
| assertTrue(c.read(zeroSizedBuffer), "read() should return true for zero-sized buffer"); | ||||||||||||
| // Verify buffer state unchanged | ||||||||||||
| assertEquals(0, zeroSizedBuffer.remaining()); | ||||||||||||
| assertEquals(0, zeroSizedBuffer.position()); | ||||||||||||
| assertEquals(0, zeroSizedBuffer.limit()); | ||||||||||||
|
|
||||||||||||
| c.close(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| void tryWithResourcesClosesAutomatically() throws Exception { | ||||||||||||
| final File f = tempDir.resolve("try-with-resources").toFile(); | ||||||||||||
| try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { | ||||||||||||
| raf.write(new byte[]{10, 20, 30}); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| final RandomAccessFileChannel c = new RandomAccessFileChannel(); | ||||||||||||
| c.open(f); | ||||||||||||
| assertTrue(c.isOpen()); | ||||||||||||
| // Closeable contract: close via try-with-resources helper | ||||||||||||
| closeAndVerify(c); | ||||||||||||
| assertFalse(c.isOpen(), "should be closed after try-with-resources"); | ||||||||||||
| assertDoesNotThrow(c::close, "double close should be safe"); | ||||||||||||
|
peterxcli marked this conversation as resolved.
|
||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private static void closeAndVerify(Closeable closeable) throws IOException { | ||||||||||||
| try (Closeable c = closeable) { | ||||||||||||
| assertNotNull(c); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private static void setField(Object target, String name, Object value) | ||||||||||||
| throws ReflectiveOperationException { | ||||||||||||
| final Field f = RandomAccessFileChannel.class.getDeclaredField(name); | ||||||||||||
| f.setAccessible(true); | ||||||||||||
| f.set(target, value); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
Uh oh!
There was an error while loading. Please reload this page.