001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024 025import org.apache.commons.compress.compressors.CompressorOutputStream; 026import org.apache.commons.compress.compressors.lz77support.Parameters; 027import org.apache.commons.compress.utils.ByteUtils; 028 029/** 030 * CompressorOutputStream for the framing Snappy format. 031 * 032 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p> 033 * 034 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 035 * @since 1.14 036 * @NotThreadSafe 037 */ 038public class FramedSnappyCompressorOutputStream extends CompressorOutputStream { 039 // see spec: 040 // > However, we place an additional restriction that the uncompressed data 041 // > in a chunk must be no longer than 65536 bytes. This allows consumers to 042 // > easily use small fixed-size buffers. 043 private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16; 044 045 private final OutputStream out; 046 private final Parameters params; 047 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 048 // used in one-arg write method 049 private final byte[] oneByte = new byte[1]; 050 private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE]; 051 private int currentIndex = 0; 052 053 private final ByteUtils.ByteConsumer consumer; 054 055 /** 056 * Constructs a new output stream that compresses 057 * snappy-framed-compressed data to the specified output stream. 058 * @param out the OutputStream to which to write the compressed data 059 * @throws IOException if writing the signature fails 060 */ 061 public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException { 062 this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE) 063 .build()); 064 } 065 066 /** 067 * Constructs a new output stream that compresses 068 * snappy-framed-compressed data to the specified output stream. 069 * @param out the OutputStream to which to write the compressed data 070 * @param params parameters used to fine-tune compression, in 071 * particular to balance compression ratio vs compression speed. 072 * @throws IOException if writing the signature fails 073 */ 074 public FramedSnappyCompressorOutputStream(final OutputStream out, Parameters params) throws IOException { 075 this.out = out; 076 this.params = params; 077 consumer = new ByteUtils.OutputStreamByteConsumer(out); 078 out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE); 079 } 080 081 @Override 082 public void write(int b) throws IOException { 083 oneByte[0] = (byte) (b & 0xff); 084 write(oneByte); 085 } 086 087 @Override 088 public void write(byte[] data, int off, int len) throws IOException { 089 if (currentIndex + len > MAX_COMPRESSED_BUFFER_SIZE) { 090 flushBuffer(); 091 while (len > MAX_COMPRESSED_BUFFER_SIZE) { 092 System.arraycopy(data, off, buffer, 0, MAX_COMPRESSED_BUFFER_SIZE); 093 off += MAX_COMPRESSED_BUFFER_SIZE; 094 len -= MAX_COMPRESSED_BUFFER_SIZE; 095 currentIndex = MAX_COMPRESSED_BUFFER_SIZE; 096 flushBuffer(); 097 } 098 } 099 System.arraycopy(data, off, buffer, currentIndex, len); 100 currentIndex += len; 101 } 102 103 @Override 104 public void close() throws IOException { 105 try { 106 finish(); 107 } finally { 108 out.close(); 109 } 110 } 111 112 /** 113 * Compresses all remaining data and writes it to the stream, 114 * doesn't close the underlying stream. 115 * @throws IOException if an error occurs 116 */ 117 public void finish() throws IOException { 118 if (currentIndex > 0) { 119 flushBuffer(); 120 } 121 } 122 123 private void flushBuffer() throws IOException { 124 out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE); 125 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 126 try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) { 127 o.write(buffer, 0, currentIndex); 128 } 129 byte[] b = baos.toByteArray(); 130 writeLittleEndian(3, b.length + 4L /* CRC */); 131 writeCrc(); 132 out.write(b); 133 currentIndex = 0; 134 } 135 136 private void writeLittleEndian(final int numBytes, long num) throws IOException { 137 ByteUtils.toLittleEndian(consumer, num, numBytes); 138 } 139 140 private void writeCrc() throws IOException { 141 checksum.update(buffer, 0, currentIndex); 142 writeLittleEndian(4, mask(checksum.getValue())); 143 checksum.reset(); 144 } 145 146 static long mask(long x) { 147 // ugly, maybe we should just have used ints and deal with the 148 // overflow 149 x = ((x >> 15) | (x << 17)); 150 x += FramedSnappyCompressorInputStream.MASK_OFFSET; 151 x &= 0xffffFFFFL; 152 return x; 153 } 154}