001/*002 *  Licensed to the Apache Software Foundation (ASF) under one or more003 *  contributor license agreements.  See the NOTICE file distributed with004 *  this work for additional information regarding copyright ownership.005 *  The ASF licenses this file to You under the Apache License, Version 2.0006 *  (the "License"); you may not use this file except in compliance with007 *  the License.  You may obtain a copy of the License at008 *009 *      http://www.apache.org/licenses/LICENSE-2.0010 *011 *  Unless required by applicable law or agreed to in writing, software012 *  distributed under the License is distributed on an "AS IS" BASIS,013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.014 *  See the License for the specific language governing permissions and015 *  limitations under the License.016 *017 */018package org.apache.commons.compress.archivers.zip;019020import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;021import org.apache.commons.compress.parallel.InputStreamSupplier;022import org.apache.commons.compress.parallel.ScatterGatherBackingStore;023import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;024025import java.io.File;026import java.io.IOException;027import java.util.ArrayList;028import java.util.List;029import java.util.concurrent.Callable;030import java.util.concurrent.ExecutionException;031import java.util.concurrent.ExecutorService;032import java.util.concurrent.Executors;033import java.util.concurrent.Future;034import java.util.concurrent.TimeUnit;035import java.util.concurrent.atomic.AtomicInteger;036import java.util.zip.Deflater;037038import static java.util.Collections.synchronizedList;039import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;040041/**042 * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.043 * <p>044 * Note that this class generally makes no guarantees about the order of things written to045 * the output file. Things that need to come in a specific order (manifests, directories)046 * must be handled by the client of this class, usually by writing these things to the047 * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p>048 * <p>049 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of050 * memory model consistency, this will be shut down by this class prior to completion.051 * </p>052 * @since 1.10053 */054public class ParallelScatterZipCreator {055    private final List<ScatterZipOutputStream> streams = synchronizedList(new ArrayList<ScatterZipOutputStream>());056    private final ExecutorService es;057    private final ScatterGatherBackingStoreSupplier backingStoreSupplier;058    private final List<Future<Object>> futures = new ArrayList<Future<Object>>();059060    private final long startedAt = System.currentTimeMillis();061    private long compressionDoneAt = 0;062    private long scatterDoneAt;063064    private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier {065        final AtomicInteger storeNum = new AtomicInteger(0);066067        public ScatterGatherBackingStore get() throws IOException {068            File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet());069            return new FileBasedScatterGatherBackingStore(tempFile);070        }071    }072073    private ScatterZipOutputStream createDeferred(ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)074            throws IOException {075        ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();076        StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);077        return new ScatterZipOutputStream(bs, sc);078    }079080    private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {081        @Override082        protected ScatterZipOutputStream initialValue() {083            try {084                ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);085                streams.add(scatterStream);086                return scatterStream;087            } catch (IOException e) {088                throw new RuntimeException(e);089            }090        }091    };092093    /**094     * Create a ParallelScatterZipCreator with default threads, which is set to the number of available095     * processors, as defined by {@link java.lang.Runtime#availableProcessors}096     */097    public ParallelScatterZipCreator() {098        this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));099    }100101    /**102     * Create a ParallelScatterZipCreator103     *104     * @param executorService The executorService to use for parallel scheduling. For technical reasons,105     *                        this will be shut down by this class.106     */107    public ParallelScatterZipCreator(ExecutorService executorService) {108        this(executorService, new DefaultBackingStoreSupplier());109    }110111    /**112     * Create a ParallelScatterZipCreator113     *114     * @param executorService The executorService to use. For technical reasons, this will be shut down115     *                        by this class.116     * @param backingStoreSupplier The supplier of backing store which shall be used117     */118    public ParallelScatterZipCreator(ExecutorService executorService,119                                     ScatterGatherBackingStoreSupplier backingStoreSupplier) {120        this.backingStoreSupplier = backingStoreSupplier;121        es = executorService;122    }123124    /**125     * Adds an archive entry to this archive.126     * <p>127     * This method is expected to be called from a single client thread128     * </p>129     *130     * @param zipArchiveEntry The entry to add.131     * @param source          The source input stream supplier132     */133134    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {135        submit(createCallable(zipArchiveEntry, source));136    }137138    /**139     * Submit a callable for compression.140     *141     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.142     *143     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.144     */145    public final void submit(Callable<Object> callable) {146        futures.add(es.submit(callable));147    }148149    /**150     * Create a callable that will compress the given archive entry.151     *152     * <p>This method is expected to be called from a single client thread.</p>153     *154     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submit submit}.155     * The most common use case for using {@link #createCallable createCallable} and {@link #submit submit} from a156     * client is if you want to wrap the callable in something that can be prioritized by the supplied157     * {@link ExecutorService}, for instance to process large or slow files first.158     * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.159     *160     * @param zipArchiveEntry The entry to add.161     * @param source          The source input stream supplier162     * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The163     * value of this callable is not used, but any exceptions happening inside the compression164     * will be propagated through the callable.165     */166167    public final Callable<Object> createCallable(ZipArchiveEntry zipArchiveEntry, InputStreamSupplier source) {168        final int method = zipArchiveEntry.getMethod();169        if (method == ZipMethod.UNKNOWN_CODE) {170            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);171        }172        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);173        return new Callable<Object>() {174            public Object call() throws Exception {175                tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequest);176                return null;177            }178        };179    }180181182    /**183     * Write the contents this to the target {@link ZipArchiveOutputStream}.184     * <p>185     * It may be beneficial to write things like directories and manifest files to the targetStream186     * before calling this method.187     * </p>188     *189     * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams190     * @throws IOException          If writing fails191     * @throws InterruptedException If we get interrupted192     * @throws ExecutionException   If something happens in the parallel execution193     */194    public void writeTo(ZipArchiveOutputStream targetStream)195            throws IOException, InterruptedException, ExecutionException {196197        // Make sure we catch any exceptions from parallel phase198        for (Future<?> future : futures) {199            future.get();200        }201202        es.shutdown();203        es.awaitTermination(1000 * 60, TimeUnit.SECONDS);  // == Infinity. We really *must* wait for this to complete204205        // It is important that all threads terminate before we go on, ensure happens-before relationship206        compressionDoneAt = System.currentTimeMillis();207208        for (ScatterZipOutputStream scatterStream : streams) {209            scatterStream.writeTo(targetStream);210            scatterStream.close();211        }212213        scatterDoneAt = System.currentTimeMillis();214    }215216    /**217     * Returns a message describing the overall statistics of the compression run218     *219     * @return A string220     */221    public ScatterStatistics getStatisticsMessage() {222        return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);223    }224}225


NOTHING
NOTHING
Add the Maven Dependecy to your project: maven dependecy for com.amazonaws : aws-java-sdk : 1.3.14