001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.io.function;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Objects;
026import java.util.Optional;
027import java.util.Spliterator;
028import java.util.Spliterators;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.atomic.AtomicReference;
031import java.util.function.BiFunction;
032import java.util.function.IntFunction;
033import java.util.function.ToDoubleFunction;
034import java.util.function.ToIntFunction;
035import java.util.function.ToLongFunction;
036import java.util.function.UnaryOperator;
037import java.util.stream.Collector;
038import java.util.stream.DoubleStream;
039import java.util.stream.IntStream;
040import java.util.stream.LongStream;
041import java.util.stream.Stream;
042import java.util.stream.StreamSupport;
043
044import org.apache.commons.io.IOExceptionList;
045
046/**
047 * Like {@link Stream} but throws {@link IOException}.
048 *
049 * @param <T> the type of the stream elements.
050 * @since 2.12.0
051 */
052public interface IOStream<T> extends IOBaseStream<T, IOStream<T>, Stream<T>> {
053
054    /**
055     * Constructs a new IOStream for the given Stream.
056     *
057     * @param <T> the type of the stream elements.
058     * @param stream The stream to delegate.
059     * @return a new IOStream.
060     */
061    static <T> IOStream<T> adapt(final Stream<T> stream) {
062        return IOStreamAdapter.adapt(stream);
063    }
064
065    /**
066     * This class' version of {@link Stream#empty()}.
067     *
068     * @param <T> the type of the stream elements
069     * @return an empty sequential {@code IOStreamImpl}.
070     * @see Stream#empty()
071     */
072    static <T> IOStream<T> empty() {
073        return IOStreamAdapter.adapt(Stream.empty());
074    }
075
076    /**
077     * Like {@link Stream#iterate(Object, UnaryOperator)} but for IO.
078     *
079     * @param <T> the type of stream elements.
080     * @param seed the initial element.
081     * @param f a function to be applied to the previous element to produce a new element.
082     * @return a new sequential {@code IOStream}.
083     */
084    static <T> IOStream<T> iterate(final T seed, final IOUnaryOperator<T> f) {
085        Objects.requireNonNull(f);
086        final Iterator<T> iterator = new Iterator<T>() {
087            @SuppressWarnings("unchecked")
088            T t = (T) IOStreams.NONE;
089
090            @Override
091            public boolean hasNext() {
092                return true;
093            }
094
095            @Override
096            public T next() {
097                return t = t == IOStreams.NONE ? seed : Erase.apply(f, t);
098            }
099        };
100        return adapt(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED | Spliterator.IMMUTABLE), false));
101    }
102
103    /**
104     * Null-safe version of {@link StreamSupport#stream(java.util.Spliterator, boolean)}.
105     *
106     * Copied from Apache Commons Lang.
107     *
108     * @param <T> the type of stream elements.
109     * @param values the elements of the new stream, may be {@code null}.
110     * @return the new stream on {@code values} or {@link Stream#empty()}.
111     */
112    @SuppressWarnings("resource") // call to #empty()
113    static <T> IOStream<T> of(final Iterable<T> values) {
114        return values == null ? empty() : adapt(StreamSupport.stream(values.spliterator(), false));
115    }
116
117    /**
118     * Null-safe version of {@link Stream#of(Object[])} for an IO stream.
119     *
120     * @param <T> the type of stream elements.
121     * @param values the elements of the new stream, may be {@code null}.
122     * @return the new stream on {@code values} or {@link Stream#empty()}.
123     */
124    @SuppressWarnings("resource")
125    @SafeVarargs // Creating a stream from an array is safe
126    static <T> IOStream<T> of(final T... values) {
127        return values == null || values.length == 0 ? empty() : adapt(Arrays.stream(values));
128    }
129
130    /**
131     * Returns a sequential {@code IOStreamImpl} containing a single element.
132     *
133     * @param t the single element
134     * @param <T> the type of stream elements
135     * @return a singleton sequential stream
136     */
137    static <T> IOStream<T> of(final T t) {
138        return adapt(Stream.of(t));
139    }
140
141    /**
142     * Like {@link Stream#allMatch(java.util.function.Predicate)} but throws {@link IOException}.
143     *
144     * @param predicate {@link Stream#allMatch(java.util.function.Predicate)}.
145     * @return Like {@link Stream#allMatch(java.util.function.Predicate)}.
146     * @throws IOException if an I/O error occurs.
147     */
148    @SuppressWarnings("unused") // thrown by Erase.
149    default boolean allMatch(final IOPredicate<? super T> predicate) throws IOException {
150        return unwrap().allMatch(t -> Erase.test(predicate, t));
151    }
152
153    /**
154     * Like {@link Stream#anyMatch(java.util.function.Predicate)} but throws {@link IOException}.
155     *
156     * @param predicate {@link Stream#anyMatch(java.util.function.Predicate)}.
157     * @return Like {@link Stream#anyMatch(java.util.function.Predicate)}.
158     * @throws IOException if an I/O error occurs.
159     */
160    @SuppressWarnings("unused") // thrown by Erase.
161    default boolean anyMatch(final IOPredicate<? super T> predicate) throws IOException {
162        return unwrap().anyMatch(t -> Erase.test(predicate, t));
163    }
164
165    /**
166     * TODO Package-private for now, needs IOCollector?
167     *
168     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
169     * would be ideal to have only one.
170     *
171     * Like {@link Stream#collect(Collector)}.
172     *
173     * Package private for now.
174     *
175     * @param <R> Like {@link Stream#collect(Collector)}.
176     * @param <A> Like {@link Stream#collect(Collector)}.
177     * @param collector Like {@link Stream#collect(Collector)}.
178     * @return Like {@link Stream#collect(Collector)}.
179     */
180    default <R, A> R collect(final Collector<? super T, A, R> collector) {
181        return unwrap().collect(collector);
182    }
183
184    /**
185     * Like
186     * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
187     *
188     * @param <R> Like
189     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
190     * @param supplier Like
191     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
192     * @param accumulator Like
193     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
194     * @param combiner Like
195     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
196     * @return Like
197     *         {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
198     * @throws IOException if an I/O error occurs.
199     */
200    @SuppressWarnings("unused") // thrown by Erase.
201    default <R> R collect(final IOSupplier<R> supplier, final IOBiConsumer<R, ? super T> accumulator, final IOBiConsumer<R, R> combiner) throws IOException {
202        return unwrap().collect(() -> Erase.get(supplier), (t, u) -> Erase.accept(accumulator, t, u), (t, u) -> Erase.accept(combiner, t, u));
203    }
204
205    /**
206     * Like {@link Stream#count()}.
207     *
208     * @return Like {@link Stream#count()}.
209     */
210    default long count() {
211        return unwrap().count();
212    }
213
214    /**
215     * Like {@link Stream#distinct()}.
216     *
217     * @return Like {@link Stream#distinct()}.
218     */
219    default IOStream<T> distinct() {
220        return adapt(unwrap().distinct());
221    }
222
223    /**
224     * Like {@link Stream#filter(java.util.function.Predicate)}.
225     *
226     * @param predicate Like {@link Stream#filter(java.util.function.Predicate)}.
227     * @return Like {@link Stream#filter(java.util.function.Predicate)}.
228     * @throws IOException if an I/O error occurs.
229     */
230    @SuppressWarnings("unused") // thrown by Erase.
231    default IOStream<T> filter(final IOPredicate<? super T> predicate) throws IOException {
232        return adapt(unwrap().filter(t -> Erase.test(predicate, t)));
233    }
234
235    /**
236     * Like {@link Stream#findAny()}.
237     *
238     * @return Like {@link Stream#findAny()}.
239     */
240    default Optional<T> findAny() {
241        return unwrap().findAny();
242    }
243
244    /**
245     * Like {@link Stream#findFirst()}.
246     *
247     * @return Like {@link Stream#findFirst()}.
248     */
249    default Optional<T> findFirst() {
250        return unwrap().findFirst();
251    }
252
253    /**
254     * Like {@link Stream#flatMap(java.util.function.Function)}.
255     *
256     * @param <R> Like {@link Stream#flatMap(java.util.function.Function)}.
257     * @param mapper Like {@link Stream#flatMap(java.util.function.Function)}.
258     * @return Like {@link Stream#flatMap(java.util.function.Function)}.
259     * @throws IOException if an I/O error occurs.
260     */
261    @SuppressWarnings("unused") // thrown by Erase.
262    default <R> IOStream<R> flatMap(final IOFunction<? super T, ? extends IOStream<? extends R>> mapper) throws IOException {
263        return adapt(unwrap().flatMap(t -> Erase.apply(mapper, t).unwrap()));
264    }
265
266    /**
267     * TODO Package-private for now, needs IODoubleStream?
268     *
269     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
270     * would be ideal to have only one.
271     *
272     * Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
273     *
274     * @param mapper Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
275     * @return Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
276     * @throws IOException if an I/O error occurs.
277     */
278    @SuppressWarnings("unused") // thrown by Erase.
279    default DoubleStream flatMapToDouble(final IOFunction<? super T, ? extends DoubleStream> mapper) throws IOException {
280        return unwrap().flatMapToDouble(t -> Erase.apply(mapper, t));
281    }
282
283    /**
284     * TODO Package-private for now, needs IOIntStream?
285     *
286     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
287     * would be ideal to have only one.
288     *
289     * Like {@link Stream#flatMapToInt(java.util.function.Function)}.
290     *
291     * @param mapper Like {@link Stream#flatMapToInt(java.util.function.Function)}.
292     * @return Like {@link Stream#flatMapToInt(java.util.function.Function)}.
293     * @throws IOException if an I/O error occurs.
294     */
295    @SuppressWarnings("unused") // thrown by Erase.
296    default IntStream flatMapToInt(final IOFunction<? super T, ? extends IntStream> mapper) throws IOException {
297        return unwrap().flatMapToInt(t -> Erase.apply(mapper, t));
298    }
299
300    /**
301     * TODO Package-private for now, needs IOLongStream?
302     *
303     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
304     * would be ideal to have only one.
305     *
306     * Like {@link Stream#flatMapToLong(java.util.function.Function)}.
307     *
308     * @param mapper Like {@link Stream#flatMapToLong(java.util.function.Function)}.
309     * @return Like {@link Stream#flatMapToLong(java.util.function.Function)}.
310     * @throws IOException if an I/O error occurs.
311     */
312    @SuppressWarnings("unused") // thrown by Erase.
313    default LongStream flatMapToLong(final IOFunction<? super T, ? extends LongStream> mapper) throws IOException {
314        return unwrap().flatMapToLong(t -> Erase.apply(mapper, t));
315    }
316
317    /**
318     * Performs an action for each element gathering any exceptions.
319     *
320     * @param action The action to apply to each element.
321     * @throws IOExceptionList if any I/O errors occur.
322     */
323    default void forAll(final IOConsumer<T> action) throws IOExceptionList {
324        forAll(action, (i, e) -> e);
325    }
326
327    /**
328     * Performs an action for each element gathering any exceptions.
329     *
330     * @param action The action to apply to each element.
331     * @param exSupplier The exception supplier.
332     * @throws IOExceptionList if any I/O errors occur.
333     */
334    default void forAll(final IOConsumer<T> action, final BiFunction<Integer, IOException, IOException> exSupplier) throws IOExceptionList {
335        final AtomicReference<List<IOException>> causeList = new AtomicReference<>();
336        final AtomicInteger index = new AtomicInteger();
337        final IOConsumer<T> safeAction = IOStreams.toIOConsumer(action);
338        unwrap().forEach(e -> {
339            try {
340                safeAction.accept(e);
341            } catch (final IOException innerEx) {
342                if (causeList.get() == null) {
343                    // Only allocate if required
344                    causeList.set(new ArrayList<>());
345                }
346                if (exSupplier != null) {
347                    causeList.get().add(exSupplier.apply(index.get(), innerEx));
348                }
349            }
350            index.incrementAndGet();
351        });
352        IOExceptionList.checkEmpty(causeList.get(), null);
353    }
354
355    /**
356     * Like {@link Stream#forEach(java.util.function.Consumer)} but throws {@link IOException}.
357     *
358     * @param action Like {@link Stream#forEach(java.util.function.Consumer)}.
359     * @throws IOException if an I/O error occurs.
360     */
361    @SuppressWarnings("unused") // thrown by Erase.
362    default void forEach(final IOConsumer<? super T> action) throws IOException {
363        unwrap().forEach(e -> Erase.accept(action, e));
364    }
365
366    /**
367     * Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
368     *
369     * @param action Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
370     * @throws IOException if an I/O error occurs.
371     */
372    @SuppressWarnings("unused") // thrown by Erase.
373    default void forEachOrdered(final IOConsumer<? super T> action) throws IOException {
374        unwrap().forEachOrdered(e -> Erase.accept(action, e));
375    }
376
377    /**
378     * Like {@link Stream#limit(long)}.
379     *
380     * @param maxSize Like {@link Stream#limit(long)}.
381     * @return Like {@link Stream#limit(long)}.
382     */
383    default IOStream<T> limit(final long maxSize) {
384        return adapt(unwrap().limit(maxSize));
385    }
386
387    /**
388     * Like {@link Stream#map(java.util.function.Function)}.
389     *
390     * @param <R> Like {@link Stream#map(java.util.function.Function)}.
391     * @param mapper Like {@link Stream#map(java.util.function.Function)}.
392     * @return Like {@link Stream#map(java.util.function.Function)}.
393     * @throws IOException if an I/O error occurs.
394     */
395    @SuppressWarnings("unused") // thrown by Erase.
396    default <R> IOStream<R> map(final IOFunction<? super T, ? extends R> mapper) throws IOException {
397        return adapt(unwrap().map(t -> Erase.apply(mapper, t)));
398    }
399
400    /**
401     * TODO Package-private for now, needs IOToDoubleFunction?
402     *
403     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
404     * would be ideal to have only one.
405     *
406     * Like {@link Stream#mapToDouble(ToDoubleFunction)}.
407     *
408     * Package private for now.
409     *
410     * @param mapper Like {@link Stream#mapToDouble(ToDoubleFunction)}.
411     * @return Like {@link Stream#mapToDouble(ToDoubleFunction)}.
412     */
413    default DoubleStream mapToDouble(final ToDoubleFunction<? super T> mapper) {
414        return unwrap().mapToDouble(mapper);
415    }
416
417    /**
418     * TODO Package-private for now, needs IOToIntFunction?
419     *
420     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
421     * would be ideal to have only one.
422     *
423     * Like {@link Stream#mapToInt(ToIntFunction)}.
424     *
425     * Package private for now.
426     *
427     * @param mapper Like {@link Stream#mapToInt(ToIntFunction)}.
428     * @return Like {@link Stream#mapToInt(ToIntFunction)}.
429     */
430    default IntStream mapToInt(final ToIntFunction<? super T> mapper) {
431        return unwrap().mapToInt(mapper);
432    }
433
434    /**
435     * TODO Package-private for now, needs IOToLongFunction?
436     *
437     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
438     * would be ideal to have only one.
439     *
440     * Like {@link Stream#mapToLong(ToLongFunction)}.
441     *
442     * Package private for now.
443     *
444     * @param mapper Like {@link Stream#mapToLong(ToLongFunction)}.
445     * @return Like {@link Stream#mapToLong(ToLongFunction)}.
446     */
447    default LongStream mapToLong(final ToLongFunction<? super T> mapper) {
448        return unwrap().mapToLong(mapper);
449    }
450
451    /**
452     * Like {@link Stream#max(java.util.Comparator)}.
453     *
454     * @param comparator Like {@link Stream#max(java.util.Comparator)}.
455     * @return Like {@link Stream#max(java.util.Comparator)}.
456     * @throws IOException if an I/O error occurs.
457     */
458    @SuppressWarnings("unused") // thrown by Erase.
459    default Optional<T> max(final IOComparator<? super T> comparator) throws IOException {
460        return unwrap().max((t, u) -> Erase.compare(comparator, t, u));
461    }
462
463    /**
464     * Like {@link Stream#min(java.util.Comparator)}.
465     *
466     * @param comparator Like {@link Stream#min(java.util.Comparator)}.
467     * @return Like {@link Stream#min(java.util.Comparator)}.
468     * @throws IOException if an I/O error occurs.
469     */
470    @SuppressWarnings("unused") // thrown by Erase.
471    default Optional<T> min(final IOComparator<? super T> comparator) throws IOException {
472        return unwrap().min((t, u) -> Erase.compare(comparator, t, u));
473    }
474
475    /**
476     * Like {@link Stream#noneMatch(java.util.function.Predicate)}.
477     *
478     * @param predicate Like {@link Stream#noneMatch(java.util.function.Predicate)}.
479     * @return Like {@link Stream#noneMatch(java.util.function.Predicate)}.
480     * @throws IOException if an I/O error occurs.
481     */
482    @SuppressWarnings("unused") // thrown by Erase.
483    default boolean noneMatch(final IOPredicate<? super T> predicate) throws IOException {
484        return unwrap().noneMatch(t -> Erase.test(predicate, t));
485    }
486
487    /**
488     * Like {@link Stream#peek(java.util.function.Consumer)}.
489     *
490     * @param action Like {@link Stream#peek(java.util.function.Consumer)}.
491     * @return Like {@link Stream#peek(java.util.function.Consumer)}.
492     * @throws IOException if an I/O error occurs.
493     */
494    @SuppressWarnings("unused") // thrown by Erase.
495    default IOStream<T> peek(final IOConsumer<? super T> action) throws IOException {
496        return adapt(unwrap().peek(t -> Erase.accept(action, t)));
497    }
498
499    /**
500     * Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
501     *
502     * @param accumulator Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
503     * @return Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
504     * @throws IOException if an I/O error occurs.
505     */
506    @SuppressWarnings("unused") // thrown by Erase.
507    default Optional<T> reduce(final IOBinaryOperator<T> accumulator) throws IOException {
508        return unwrap().reduce((t, u) -> Erase.apply(accumulator, t, u));
509    }
510
511    /**
512     * Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
513     *
514     * @param identity Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
515     * @param accumulator Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
516     * @return Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
517     * @throws IOException if an I/O error occurs.
518     */
519    @SuppressWarnings("unused") // thrown by Erase.
520    default T reduce(final T identity, final IOBinaryOperator<T> accumulator) throws IOException {
521        return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u));
522    }
523
524    /**
525     * Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
526     *
527     * @param <U> Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
528     * @param identity Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
529     * @param accumulator Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
530     * @param combiner Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
531     * @return Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
532     * @throws IOException if an I/O error occurs.
533     */
534    @SuppressWarnings("unused") // thrown by Erase.
535    default <U> U reduce(final U identity, final IOBiFunction<U, ? super T, U> accumulator, final IOBinaryOperator<U> combiner) throws IOException {
536        return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u), (t, u) -> Erase.apply(combiner, t, u));
537    }
538
539    /**
540     * Like {@link Stream#skip(long)}.
541     *
542     * @param n Like {@link Stream#skip(long)}.
543     * @return Like {@link Stream#skip(long)}.
544     */
545    default IOStream<T> skip(final long n) {
546        return adapt(unwrap().skip(n));
547    }
548
549    /**
550     * Like {@link Stream#sorted()}.
551     *
552     * @return Like {@link Stream#sorted()}.
553     */
554    default IOStream<T> sorted() {
555        return adapt(unwrap().sorted());
556    }
557
558    /**
559     * Like {@link Stream#sorted(java.util.Comparator)}.
560     *
561     * @param comparator Like {@link Stream#sorted(java.util.Comparator)}.
562     * @return Like {@link Stream#sorted(java.util.Comparator)}.
563     * @throws IOException if an I/O error occurs.
564     */
565    @SuppressWarnings("unused") // thrown by Erase.
566    default IOStream<T> sorted(final IOComparator<? super T> comparator) throws IOException {
567        return adapt(unwrap().sorted((t, u) -> Erase.compare(comparator, t, u)));
568    }
569
570    /**
571     * Like {@link Stream#toArray()}.
572     *
573     * @return {@link Stream#toArray()}.
574     */
575    default Object[] toArray() {
576        return unwrap().toArray();
577    }
578
579    /**
580     * TODO Package-private for now, needs IOIntFunction?
581     *
582     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
583     * would be ideal to have only one.
584     *
585     * Like {@link Stream#toArray(IntFunction)}.
586     *
587     * Package private for now.
588     *
589     * @param <A> Like {@link Stream#toArray(IntFunction)}.
590     * @param generator Like {@link Stream#toArray(IntFunction)}.
591     * @return Like {@link Stream#toArray(IntFunction)}.
592     */
593    default <A> A[] toArray(final IntFunction<A[]> generator) {
594        return unwrap().toArray(generator);
595    }
596
597}