GNU Classpath (0.95) | |
Frames | No Frames |
1: /* PipedInputStream.java -- Read portion of piped streams. 2: Copyright (C) 1998, 1999, 2000, 2001, 2003, 2005 Free Software Foundation, Inc. 3: 4: This file is part of GNU Classpath. 5: 6: GNU Classpath is free software; you can redistribute it and/or modify 7: it under the terms of the GNU General Public License as published by 8: the Free Software Foundation; either version 2, or (at your option) 9: any later version. 10: 11: GNU Classpath is distributed in the hope that it will be useful, but 12: WITHOUT ANY WARRANTY; without even the implied warranty of 13: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14: General Public License for more details. 15: 16: You should have received a copy of the GNU General Public License 17: along with GNU Classpath; see the file COPYING. If not, write to the 18: Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 19: 02110-1301 USA. 20: 21: Linking this library statically or dynamically with other modules is 22: making a combined work based on this library. Thus, the terms and 23: conditions of the GNU General Public License cover the whole 24: combination. 25: 26: As a special exception, the copyright holders of this library give you 27: permission to link this library with independent modules to produce an 28: executable, regardless of the license terms of these independent 29: modules, and to copy and distribute the resulting executable under 30: terms of your choice, provided that you also meet, for each linked 31: independent module, the terms and conditions of the license of that 32: module. An independent module is a module which is not derived from 33: or based on this library. If you modify this library, you may extend 34: this exception to your version of the library, but you are not 35: obligated to do so. If you do not wish to do so, delete this 36: exception statement from your version. */ 37: 38: package java.io; 39: 40: // NOTE: This implementation is very similar to that of PipedReader. If you 41: // fix a bug in here, chances are you should make a similar change to the 42: // PipedReader code. 43: 44: /** 45: * An input stream that reads its bytes from an output stream 46: * to which it is connected. 47: * <p> 48: * Data is read and written to an internal buffer. It is highly recommended 49: * that the <code>PipedInputStream</code> and connected 50: * <code>PipedOutputStream</code> 51: * be part of different threads. If they are not, the read and write 52: * operations could deadlock their thread. 53: * 54: * @specnote The JDK implementation appears to have some undocumented 55: * functionality where it keeps track of what thread is writing 56: * to pipe and throws an IOException if that thread susequently 57: * dies. This behaviour seems dubious and unreliable - we don't 58: * implement it. 59: * 60: * @author Aaron M. Renn (arenn@urbanophile.com) 61: */ 62: public class PipedInputStream extends InputStream 63: { 64: /** PipedOutputStream to which this is connected. Null only if this 65: * InputStream hasn't been connected yet. */ 66: PipedOutputStream source; 67: 68: /** Set to true if close() has been called on this InputStream. */ 69: boolean closed; 70: 71: 72: /** 73: * The size of the internal buffer used for input/output. 74: */ 75: /* The "Constant Field Values" Javadoc of the Sun J2SE 1.4 76: * specifies 1024. 77: */ 78: protected static final int PIPE_SIZE = 1024; 79: 80: 81: /** 82: * This is the internal circular buffer used for storing bytes written 83: * to the pipe and from which bytes are read by this stream 84: */ 85: protected byte[] buffer = new byte[PIPE_SIZE]; 86: 87: /** 88: * The index into buffer where the next byte from the connected 89: * <code>PipedOutputStream</code> will be written. If this variable is 90: * equal to <code>out</code>, then the buffer is full. If set to < 0, 91: * the buffer is empty. 92: */ 93: protected int in = -1; 94: 95: /** 96: * This index into the buffer where bytes will be read from. 97: */ 98: protected int out = 0; 99: 100: /** Buffer used to implement single-argument read/receive */ 101: private byte[] read_buf = new byte[1]; 102: 103: /** 104: * Creates a new <code>PipedInputStream</code> that is not connected to a 105: * <code>PipedOutputStream</code>. It must be connected before bytes can 106: * be read from this stream. 107: */ 108: public PipedInputStream() 109: { 110: } 111: 112: /** 113: * This constructor creates a new <code>PipedInputStream</code> and connects 114: * it to the passed in <code>PipedOutputStream</code>. The stream is then 115: * ready for reading. 116: * 117: * @param source The <code>PipedOutputStream</code> to connect this 118: * stream to 119: * 120: * @exception IOException If <code>source</code> is already connected. 121: */ 122: public PipedInputStream(PipedOutputStream source) throws IOException 123: { 124: connect(source); 125: } 126: 127: /** 128: * This method connects this stream to the passed in 129: * <code>PipedOutputStream</code>. 130: * This stream is then ready for reading. If this stream is already 131: * connected or has been previously closed, then an exception is thrown 132: * 133: * @param source The <code>PipedOutputStream</code> to connect this stream to 134: * 135: * @exception IOException If this PipedInputStream or <code>source</code> 136: * has been connected already. 137: */ 138: public void connect(PipedOutputStream source) throws IOException 139: { 140: // The JDK (1.3) does not appear to check for a previously closed 141: // connection here. 142: 143: if (this.source != null || source.sink != null) 144: throw new IOException ("Already connected"); 145: 146: source.sink = this; 147: this.source = source; 148: } 149: 150: /** 151: * This method receives a byte of input from the source PipedOutputStream. 152: * If the internal circular buffer is full, this method blocks. 153: * 154: * @param val The byte to write to this stream 155: * 156: * @exception IOException if error occurs 157: * @specnote Weird. This method must be some sort of accident. 158: */ 159: protected synchronized void receive(int val) throws IOException 160: { 161: read_buf[0] = (byte) (val & 0xff); 162: receive (read_buf, 0, 1); 163: } 164: 165: /** 166: * This method is used by the connected <code>PipedOutputStream</code> to 167: * write bytes into the buffer. 168: * 169: * @param buf The array containing bytes to write to this stream 170: * @param offset The offset into the array to start writing from 171: * @param len The number of bytes to write. 172: * 173: * @exception IOException If an error occurs 174: * @specnote This code should be in PipedOutputStream.write, but we 175: * put it here in order to support that bizarre recieve(int) 176: * method. 177: */ 178: synchronized void receive(byte[] buf, int offset, int len) 179: throws IOException 180: { 181: if (closed) 182: throw new IOException ("Pipe closed"); 183: 184: int bufpos = offset; 185: int copylen; 186: 187: while (len > 0) 188: { 189: try 190: { 191: while (in == out) 192: { 193: // The pipe is full. Wake up any readers and wait for them. 194: notifyAll(); 195: wait(); 196: // The pipe could have been closed while we were waiting. 197: if (closed) 198: throw new IOException ("Pipe closed"); 199: } 200: } 201: catch (InterruptedException ix) 202: { 203: throw new InterruptedIOException (); 204: } 205: 206: if (in < 0) // The pipe is empty. 207: in = 0; 208: 209: // Figure out how many bytes from buf can be copied without 210: // overrunning out or going past the length of the buffer. 211: if (in < out) 212: copylen = Math.min (len, out - in); 213: else 214: copylen = Math.min (len, buffer.length - in); 215: 216: // Copy bytes until the pipe is filled, wrapping if necessary. 217: System.arraycopy(buf, bufpos, buffer, in, copylen); 218: len -= copylen; 219: bufpos += copylen; 220: in += copylen; 221: if (in == buffer.length) 222: in = 0; 223: } 224: // Notify readers that new data is in the pipe. 225: notifyAll(); 226: } 227: 228: /** 229: * This method reads one byte from the stream. 230: * -1 is returned to indicated that no bytes can be read 231: * because the end of the stream was reached. If the stream is already 232: * closed, a -1 will again be returned to indicate the end of the stream. 233: * 234: * <p>This method will block if no byte is available to be read.</p> 235: * 236: * @return the value of the read byte value, or -1 of the end of the stream 237: * was reached 238: * 239: * @throws IOException if an error occured 240: */ 241: public int read() throws IOException 242: { 243: // Method operates by calling the multibyte overloaded read method 244: // Note that read_buf is an internal instance variable. I allocate it 245: // there to avoid constant reallocation overhead for applications that 246: // call this method in a loop at the cost of some unneeded overhead 247: // if this method is never called. 248: 249: int r = read(read_buf, 0, 1); 250: return r != -1 ? (read_buf[0] & 0xff) : -1; 251: } 252: 253: /** 254: * This method reads bytes from the stream into a caller supplied buffer. 255: * It starts storing bytes at position <code>offset</code> into the 256: * buffer and 257: * reads a maximum of <code>len</code> bytes. Note that this method 258: * can actually 259: * read fewer than <code>len</code> bytes. The actual number of bytes 260: * read is 261: * returned. A -1 is returned to indicated that no bytes can be read 262: * because the end of the stream was reached - ie close() was called on the 263: * connected PipedOutputStream. 264: * <p> 265: * This method will block if no bytes are available to be read. 266: * 267: * @param buf The buffer into which bytes will be stored 268: * @param offset The index into the buffer at which to start writing. 269: * @param len The maximum number of bytes to read. 270: * 271: * @exception IOException If <code>close()</code> was called on this Piped 272: * InputStream. 273: */ 274: public synchronized int read(byte[] buf, int offset, int len) 275: throws IOException 276: { 277: if (source == null) 278: throw new IOException ("Not connected"); 279: if (closed) 280: throw new IOException ("Pipe closed"); 281: 282: // Don't block if nothing was requested. 283: if (len == 0) 284: return 0; 285: 286: // If the buffer is empty, wait until there is something in the pipe 287: // to read. 288: try 289: { 290: while (in < 0) 291: { 292: if (source.closed) 293: return -1; 294: wait(); 295: } 296: } 297: catch (InterruptedException ix) 298: { 299: throw new InterruptedIOException(); 300: } 301: 302: int total = 0; 303: int copylen; 304: 305: while (true) 306: { 307: // Figure out how many bytes from the pipe can be copied without 308: // overrunning in or going past the length of buf. 309: if (out < in) 310: copylen = Math.min (len, in - out); 311: else 312: copylen = Math.min (len, buffer.length - out); 313: 314: System.arraycopy (buffer, out, buf, offset, copylen); 315: offset += copylen; 316: len -= copylen; 317: out += copylen; 318: total += copylen; 319: 320: if (out == buffer.length) 321: out = 0; 322: 323: if (out == in) 324: { 325: // Pipe is now empty. 326: in = -1; 327: out = 0; 328: } 329: 330: // If output buffer is filled or the pipe is empty, we're done. 331: if (len == 0 || in == -1) 332: { 333: // Notify any waiting outputstream that there is now space 334: // to write. 335: notifyAll(); 336: return total; 337: } 338: } 339: } 340: 341: /** 342: * This method returns the number of bytes that can be read from this stream 343: * before blocking could occur. This is the number of bytes that are 344: * currently unread in the internal circular buffer. Note that once this 345: * many additional bytes are read, the stream may block on a subsequent 346: * read, but it not guaranteed to block. 347: * 348: * @return The number of bytes that can be read before blocking might occur 349: * 350: * @exception IOException If an error occurs 351: */ 352: public synchronized int available() throws IOException 353: { 354: // The JDK 1.3 implementation does not appear to check for the closed or 355: // unconnected stream conditions here. 356: 357: if (in < 0) 358: return 0; 359: else if (out < in) 360: return in - out; 361: else 362: return (buffer.length - out) + in; 363: } 364: 365: /** 366: * This methods closes the stream so that no more data can be read 367: * from it. 368: * 369: * @exception IOException If an error occurs 370: */ 371: public synchronized void close() throws IOException 372: { 373: closed = true; 374: // Wake any thread which may be in receive() waiting to write data. 375: notifyAll(); 376: } 377: }
GNU Classpath (0.95) |