View Javadoc
1   /*
2   Copyright (c) 2012 James Ahlborn
3   
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7   
8       http://www.apache.org/licenses/LICENSE-2.0
9   
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16  
17  package com.healthmarketscience.jackcess.util;
18  
19  import java.io.File;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.io.RandomAccessFile;
24  import java.nio.ByteBuffer;
25  import java.nio.MappedByteBuffer;
26  import java.nio.channels.Channels;
27  import java.nio.channels.FileChannel;
28  import java.nio.channels.FileLock;
29  import java.nio.channels.NonWritableChannelException;
30  import java.nio.channels.ReadableByteChannel;
31  import java.nio.channels.WritableByteChannel;
32  import java.nio.file.OpenOption;
33  import java.nio.file.Path;
34  import java.nio.file.StandardOpenOption;
35  
36  import com.healthmarketscience.jackcess.Database;
37  import com.healthmarketscience.jackcess.DatabaseBuilder;
38  import com.healthmarketscience.jackcess.impl.ByteUtil;
39  import com.healthmarketscience.jackcess.impl.DatabaseImpl;
40  
41  /**
42   * FileChannel implementation which maintains the entire "file" in memory.
43   * This enables working with a Database entirely in memory (for situations
44   * where disk usage may not be possible or desirable).  Obviously, this
45   * requires enough jvm heap space to fit the file data.  Use one of the
46   * {@code newChannel()} methods to construct an instance of this class.
47   * <p>
48   * In order to use this class with a Database, you <i>must</i> use the {@link
49   * DatabaseBuilder} to open/create the Database instance, passing an instance
50   * of this class to the {@link DatabaseBuilder#setChannel} method.
51   * <p>
52   * Implementation note: this class is optimized for use with {@link Database}.
53   * Therefore not all methods may be implemented and individual read/write
54   * operations are only supported within page boundaries.
55   *
56   * @author James Ahlborn
57   * @usage _advanced_class_
58   */
59  public class MemFileChannel extends FileChannel
60  {
61    /** read-only channel access mode */
62    public static final String RO_CHANNEL_MODE = "r";
63    /** read/write channel access mode */
64    public static final String RW_CHANNEL_MODE = "rw";
65  
66    private static final byte[][] EMPTY_DATA = new byte[0][];
67  
68    // use largest possible Jet "page size" to ensure that reads/writes will
69    // always be within a single chunk
70    private static final int CHUNK_SIZE = 4096;
71    // this ensures that an "empty" mdb will fit in the initial chunk table
72    private static final int INIT_CHUNKS = 128;
73  
74    /** current read/write position */
75    private long _position;
76    /** current amount of actual data in the file */
77    private long _size;
78    /** chunks containing the file data.  the length of the chunk array is
79        always a power of 2 and the chunks are always CHUNK_SIZE. */
80    private byte[][] _data;
81  
82    private MemFileChannel()
83    {
84      this(0L, 0L, EMPTY_DATA);
85    }
86  
87    private MemFileChannel(long position, long size, byte[][] data) {
88      _position = position;
89      _size = size;
90      _data = data;
91    }
92  
93    /**
94     * Creates a new read/write, empty MemFileChannel.
95     */
96    public static MemFileChannel newChannel() {
97      return new MemFileChannel();
98    }
99  
100   /**
101    * Creates a new read/write MemFileChannel containing the contents of the
102    * given File.  Note, modifications to the returned channel will <i>not</i>
103    * affect the original File source.
104    */
105   public static MemFileChannel newChannel(File file) throws IOException {
106     return newChannel(file, RW_CHANNEL_MODE);
107   }
108 
109   /**
110    * Creates a new MemFileChannel containing the contents of the
111    * given File with the given mode (for mode details see
112    * {@link RandomAccessFile#RandomAccessFile(File,String)}).  Note,
113    * modifications to the returned channel will <i>not</i> affect the original
114    * File source.
115    */
116   public static MemFileChannel newChannel(File file, String mode)
117     throws IOException
118   {
119     FileChannel in = null;
120     try {
121       return newChannel(in = new RandomAccessFile(
122                             file, RO_CHANNEL_MODE).getChannel(),
123                         mode);
124     } finally {
125       ByteUtil.closeQuietly(in);
126     }
127   }
128 
129   /**
130    * Creates a new MemFileChannel containing the contents of the
131    * given Path with the given mode (for mode details see
132    * {@link RandomAccessFile#RandomAccessFile(File,String)}).  Note,
133    * modifications to the returned channel will <i>not</i> affect the original
134    * File source.
135    */
136   public static MemFileChannel newChannel(Path file, OpenOption... opts)
137     throws IOException
138   {
139     FileChannel in = null;
140     try {
141       String mode = RO_CHANNEL_MODE;
142       if(opts != null) {
143         for(OpenOption opt : opts) {
144           if(opt == StandardOpenOption.WRITE) {
145             mode = RW_CHANNEL_MODE;
146             break;
147           }
148         }
149       }
150       return newChannel(in = FileChannel.open(file, StandardOpenOption.READ),
151                         mode);
152     } finally {
153       ByteUtil.closeQuietly(in);
154     }
155   }
156 
157   /**
158    * Creates a new read/write MemFileChannel containing the contents of the
159    * given Path.  Note, modifications to the returned channel will <i>not</i>
160    * affect the original File source.
161    */
162   public static MemFileChannel newChannel(Path file) throws IOException {
163     return newChannel(file, DatabaseImpl.RW_CHANNEL_OPTS);
164   }
165 
166   /**
167    * Creates a new read/write MemFileChannel containing the contents of the
168    * given InputStream.
169    */
170   public static MemFileChannel newChannel(InputStream in) throws IOException {
171     return newChannel(in, RW_CHANNEL_MODE);
172   }
173 
174   /**
175    * Creates a new MemFileChannel containing the contents of the
176    * given InputStream with the given mode (for mode details see
177    * {@link RandomAccessFile#RandomAccessFile(File,String)}).
178    */
179   public static MemFileChannel newChannel(InputStream in, String mode)
180     throws IOException
181   {
182     return newChannel(Channels.newChannel(in), mode);
183   }
184 
185   /**
186    * Creates a new read/write MemFileChannel containing the contents of the
187    * given ReadableByteChannel.
188    */
189   public static MemFileChannel newChannel(ReadableByteChannel in)
190     throws IOException
191   {
192     return newChannel(in, RW_CHANNEL_MODE);
193   }
194 
195   /**
196    * Creates a new MemFileChannel containing the contents of the
197    * given ReadableByteChannel with the given mode (for mode details see
198    * {@link RandomAccessFile#RandomAccessFile(File,String)}).
199    */
200   public static MemFileChannel newChannel(ReadableByteChannel in, String mode)
201     throws IOException
202   {
203     MemFileChannel/MemFileChannel.html#MemFileChannel">MemFileChannel channel = new MemFileChannel();
204     channel.transferFrom(in, 0L, Long.MAX_VALUE);
205     if(!mode.contains("w")) {
206       channel = new ReadOnlyChannel(channel);
207     }
208     return channel;
209   }
210 
211   @Override
212   public int read(ByteBuffer dst) throws IOException {
213     int bytesRead = read(dst, _position);
214     if(bytesRead > 0) {
215       _position += bytesRead;
216     }
217     return bytesRead;
218   }
219 
220   @Override
221   public int read(ByteBuffer dst, long position) throws IOException {
222     if(position >= _size) {
223       return -1;
224     }
225 
226     int numBytes = (int)Math.min(dst.remaining(), _size - position);
227     int rem = numBytes;
228 
229     while(rem > 0) {
230       byte[] chunk = _data[getChunkIndex(position)];
231       int chunkOffset = getChunkOffset(position);
232       int bytesRead = Math.min(rem, CHUNK_SIZE - chunkOffset);
233       dst.put(chunk, chunkOffset, bytesRead);
234       rem -= bytesRead;
235       position += bytesRead;
236     }
237 
238     return numBytes;
239   }
240 
241   @Override
242   public int write(ByteBuffer src) throws IOException {
243     int bytesWritten = write(src, _position);
244     _position += bytesWritten;
245     return bytesWritten;
246   }
247 
248   @Override
249   public int write(ByteBuffer src, long position) throws IOException {
250     int numBytes = src.remaining();
251     long newSize = position + numBytes;
252     ensureCapacity(newSize);
253 
254     int rem = numBytes;
255     while(rem > 0) {
256       byte[] chunk = _data[getChunkIndex(position)];
257       int chunkOffset = getChunkOffset(position);
258       int bytesWritten = Math.min(rem, CHUNK_SIZE - chunkOffset);
259       src.get(chunk, chunkOffset, bytesWritten);
260       rem -= bytesWritten;
261       position += bytesWritten;
262     }
263 
264     if(newSize > _size) {
265       _size = newSize;
266     }
267 
268     return numBytes;
269   }
270 
271   @Override
272   public long position() throws IOException {
273     return _position;
274   }
275 
276   @Override
277   public FileChannel position(long newPosition) throws IOException {
278     if(newPosition < 0L) {
279       throw new IllegalArgumentException("negative position");
280     }
281     _position = newPosition;
282     return this;
283   }
284 
285   @Override
286   public long size() throws IOException {
287     return _size;
288   }
289 
290   @Override
291   public FileChannel truncate(long newSize) throws IOException {
292     if(newSize < 0L) {
293       throw new IllegalArgumentException("negative size");
294     }
295     if(newSize < _size) {
296       // we'll optimize for memory over speed and aggressively free unused
297       // chunks
298       for(int i = getNumChunks(newSize); i < getNumChunks(_size); ++i) {
299         _data[i] = null;
300       }
301       _size = newSize;
302     }
303     _position = Math.min(newSize, _position);
304     return this;
305   }
306 
307   @Override
308   public void force(boolean metaData) throws IOException {
309     // nothing to do
310   }
311 
312   /**
313    * Convenience method for writing the entire contents of this channel to the
314    * given destination channel.
315    * @see #transferTo(long,long,WritableByteChannel)
316    */
317   public long transferTo(WritableByteChannel dst)
318     throws IOException
319   {
320     return transferTo(0L, _size, dst);
321   }
322 
323   @Override
324   public long transferTo(long position, long count, WritableByteChannel dst)
325     throws IOException
326   {
327     if(position >= _size) {
328       return 0L;
329     }
330 
331     count = Math.min(count, _size - position);
332 
333     int chunkIndex = getChunkIndex(position);
334     int chunkOffset = getChunkOffset(position);
335 
336     long numBytes = 0L;
337     while(count > 0L) {
338 
339       int chunkBytes = (int)Math.min(count, CHUNK_SIZE - chunkOffset);
340       ByteBuffer src = ByteBuffer.wrap(_data[chunkIndex], chunkOffset,
341                                        chunkBytes);
342 
343       do {
344         int bytesWritten = dst.write(src);
345         if(bytesWritten == 0L) {
346           // dst full
347           return numBytes;
348         }
349         numBytes += bytesWritten;
350         count -= bytesWritten;
351       } while(src.hasRemaining());
352 
353       ++chunkIndex;
354       chunkOffset = 0;
355     }
356 
357     return numBytes;
358   }
359 
360   /**
361    * Convenience method for writing the entire contents of this channel to the
362    * given destination stream.
363    * @see #transferTo(long,long,WritableByteChannel)
364    */
365   public long transferTo(OutputStream dst)
366     throws IOException
367   {
368     return transferTo(0L, _size, dst);
369   }
370 
371   /**
372    * Convenience method for writing the selected portion of this channel to
373    * the given destination stream.
374    * @see #transferTo(long,long,WritableByteChannel)
375    */
376   public long transferTo(long position, long count, OutputStream dst)
377     throws IOException
378   {
379     return transferTo(position, count, Channels.newChannel(dst));
380   }
381 
382   @Override
383   public long transferFrom(ReadableByteChannel src,
384                            long position, long count)
385     throws IOException
386   {
387     int chunkIndex = getChunkIndex(position);
388     int chunkOffset = getChunkOffset(position);
389 
390     long numBytes = 0L;
391     while(count > 0L) {
392 
393       ensureCapacity(position + numBytes + 1);
394 
395       int chunkBytes = (int)Math.min(count, CHUNK_SIZE - chunkOffset);
396       ByteBuffer dst = ByteBuffer.wrap(_data[chunkIndex], chunkOffset,
397                                        chunkBytes);
398       do {
399         int bytesRead = src.read(dst);
400         if(bytesRead <= 0) {
401           // src empty
402           return numBytes;
403         }
404         numBytes += bytesRead;
405         count -= bytesRead;
406         _size = Math.max(_size, position + numBytes);
407       } while(dst.hasRemaining());
408 
409       ++chunkIndex;
410       chunkOffset = 0;
411     }
412 
413     return numBytes;
414   }
415 
416   @Override
417   protected void implCloseChannel() throws IOException {
418     // release data
419     _data = EMPTY_DATA;
420     _size = _position = 0L;
421   }
422 
423   private void ensureCapacity(long newSize)
424   {
425     if(newSize <= _size) {
426       // nothing to do
427       return;
428     }
429 
430     int newNumChunks = getNumChunks(newSize);
431     int numChunks = getNumChunks(_size);
432 
433     if(newNumChunks > _data.length) {
434 
435       // need to extend chunk array (use powers of 2)
436       int newDataLen = Math.max(_data.length, INIT_CHUNKS);
437       while(newDataLen < newNumChunks) {
438         newDataLen <<= 1;
439       }
440 
441       byte[][] newData = new byte[newDataLen][];
442 
443       // copy existing chunks
444       System.arraycopy(_data, 0, newData, 0, numChunks);
445 
446       _data = newData;
447     }
448 
449     // allocate new chunks
450     for(int i = numChunks; i < newNumChunks; ++i) {
451       _data[i] = new byte[CHUNK_SIZE];
452     }
453   }
454 
455   private static int getChunkIndex(long pos) {
456     return (int)(pos / CHUNK_SIZE);
457   }
458 
459   private static int getChunkOffset(long pos) {
460     return (int)(pos % CHUNK_SIZE);
461   }
462 
463   private static int getNumChunks(long size) {
464     return getChunkIndex(size + CHUNK_SIZE - 1);
465   }
466 
467   @Override
468   public long write(ByteBuffer[] srcs, int offset, int length)
469     throws IOException
470   {
471     long numBytes = 0L;
472     for(int i = offset; i < offset + length; ++i) {
473       numBytes += write(srcs[i]);
474     }
475     return numBytes;
476   }
477 
478   @Override
479   public long read(ByteBuffer[] dsts, int offset, int length)
480     throws IOException
481   {
482     long numBytes = 0L;
483     for(int i = offset; i < offset + length; ++i) {
484       if(_position >= _size) {
485         return ((numBytes > 0L) ? numBytes : -1L);
486       }
487       numBytes += read(dsts[i]);
488     }
489     return numBytes;
490   }
491 
492   @Override
493   public MappedByteBuffer map(MapMode mode, long position, long size)
494     throws IOException
495   {
496     throw new UnsupportedOperationException();
497   }
498 
499   @Override
500   public FileLock lock(long position, long size, boolean shared)
501     throws IOException
502   {
503     throw new UnsupportedOperationException();
504   }
505 
506   @Override
507   public FileLock tryLock(long position, long size, boolean shared)
508     throws IOException
509   {
510     throw new UnsupportedOperationException();
511   }
512 
513   /**
514    * Subclass of MemFileChannel which is read-only.
515    */
516   private static final class ReadOnlyChannel extends MemFileChannel
517   {
518     private ReadOnlyChannel(MemFileChannel channel)
519     {
520       super(channel._position, channel._size, channel._data);
521     }
522 
523     @Override
524     public int write(ByteBuffer src, long position) throws IOException {
525       throw new NonWritableChannelException();
526     }
527 
528     @Override
529     public FileChannel truncate(long newSize) throws IOException {
530       throw new NonWritableChannelException();
531     }
532 
533     @Override
534     public long transferFrom(ReadableByteChannel src,
535                              long position, long count)
536       throws IOException
537     {
538       throw new NonWritableChannelException();
539     }
540   }
541 }