Java 8 Stream, obteniendo cabeza y cola

Java 8 introdujo una clase Stream que se asemeja a Scala’s Stream , una poderosa construcción perezosa mediante la cual es posible hacer algo como esto de manera muy concisa:

def from(n: Int): Stream[Int] = n #:: from(n+1) def sieve(s: Stream[Int]): Stream[Int] = { s.head #:: sieve(s.tail filter (_ % s.head != 0)) } val primes = sieve(from(2)) primes takeWhile(_ < 1000) print // prints all primes less than 1000 

Me preguntaba si es posible hacer esto en Java 8, así que escribí algo como esto:

 IntStream from(int n) { return IntStream.iterate(n, m -> m + 1); } IntStream sieve(IntStream s) { int head = s.findFirst().getAsInt(); return IntStream.concat(IntStream.of(head), sieve(s.skip(1).filter(n -> n % head != 0))); } IntStream primes = sieve(from(2)); 

Bastante simple, pero produce java.lang.IllegalStateException: stream has already been operated upon or closed porque tanto findFirst() como skip() son operaciones de terminal en Stream que solo se pueden realizar una vez.

Realmente no tengo que usar la transmisión dos veces ya que todo lo que necesito es el primer número en la transmisión y el rest como otra transmisión, es decir, equivalente de Stream.head y Stream.tail de Scala. ¿Hay algún método en Java 8 Stream que pueda usar para lograr esto?

Gracias.

Incluso si no tiene el problema de no poder dividir un IntStream , el código no funcionó porque está invocando su método de sieve recursivamente en lugar de hacerlo de forma perezosa. Por lo tanto, tenía una recursión de infinito antes de poder consultar la secuencia resultante para el primer valor.

Es posible dividir un IntStream s en una cabeza y un cola IntStream (que aún no se ha consumido):

 PrimitiveIterator.OfInt it = s.iterator(); int head = it.nextInt(); IntStream tail = IntStream.generate(it::next).filter(i -> i % head != 0); 

En este lugar necesitas una construcción de invocando el sieve en la cola perezosamente. Stream no proporciona eso; concat espera instancias de flujo existentes como argumentos y no se puede construir una secuencia invocando sieve perezosamente con una expresión lambda, ya que la creación diferida solo funciona con estado mutable que las expresiones lambda no admiten. Si no tiene una implementación de biblioteca que oculte el estado mutable, debe usar un objeto mutable. Pero una vez que acepta el requisito de estado mutable, la solución puede ser aún más fácil que su primer enfoque:

 IntStream primes = from(2).filter(i -> p.test(i)).peek(i -> p = p.and(v -> v % i != 0)); IntPredicate p = x -> true; IntStream from(int n) { return IntStream.iterate(n, m -> m + 1); } 

Esto creará recursivamente un filtro, pero al final no importa si crea un árbol de IntPredicate o un árbol de IntStream (como con su enfoque IntStream.concat si funcionó). Si no le gusta el campo de instancia mutable para el filtro, puede ocultarlo en una clase interna (pero no en una expresión lambda …).

En esencia, puedes implementarlo así:

 static  Tuple2, Seq> splitAtHead(Stream stream) { Iterator it = stream.iterator(); return tuple(it.hasNext() ? Optional.of(it.next()) : Optional.empty(), seq(it)); } 

En el ejemplo anterior, Tuple2 y Seq son tipos tomados de jOOλ , una biblioteca que desarrollamos para pruebas de integración jOOQ . Si no desea ninguna dependencia adicional, también puede implementarla usted mismo:

 class Tuple2 { final T1 v1; final T2 v2; Tuple2(T1 v1, T2 v2) { this.v1 = v1; this.v2 = v2; } static  Tuple2 tuple(T1 v1, T2 v2) { return new Tuple<>(v1, v2); } } static  Tuple2, Stream> splitAtHead(Stream stream) { Iterator it = stream.iterator(); return tuple( it.hasNext() ? Optional.of(it.next()) : Optional.empty, StreamSupport.stream(Spliterators.spliteratorUnknownSize( it, Spliterator.ORDERED ), false) ); } 

La solución a continuación no hace mutaciones de estado, excepto para la deconstrucción de la cola / cola de la stream.

La pereza se obtiene usando IntStream.iterate. La clase Prime se usa para mantener el estado del generador

  import java.util.PrimitiveIterator; import java.util.stream.IntStream; import java.util.stream.Stream; public class Prime { private final IntStream candidates; private final int current; private Prime(int current, IntStream candidates) { this.current = current; this.candidates = candidates; } private Prime next() { PrimitiveIterator.OfInt it = candidates.filter(n -> n % current != 0).iterator(); int head = it.next(); IntStream tail = IntStream.generate(it::next); return new Prime(head, tail); } public static Stream stream() { IntStream possiblePrimes = IntStream.iterate(3, i -> i + 1); return Stream.iterate(new Prime(2, possiblePrimes), Prime::next) .map(p -> p.current); } } 

El uso sería este:

 Stream first10Primes = Prime.stream().limit(10) 

Mi biblioteca StreamEx tiene ahora la operación headTail() que resuelve el problema:

 public static StreamEx sieve(StreamEx input) { return input.headTail((head, tail) -> sieve(tail.filter(n -> n % head != 0)).prepend(head)); } 

El método headTail toma una BiFunction que se ejecutará como máximo una vez durante la ejecución de la operación del terminal de flujo. Por lo tanto, esta implementación es floja: no computa nada hasta que se inicia el recorrido transversal y solo computa tantos números primos como se solicite. La BiFunction recibe una primera head elemento de flujo y la secuencia de la tail elementos de descanso y puede modificar la tail de la forma que quiera. Puede usarlo con una entrada predefinida:

 sieve(IntStreamEx.range(2, 1000).boxed()).forEach(System.out::println); 

Pero la transmisión infinita también funciona

 sieve(StreamEx.iterate(2, x -> x+1)).takeWhile(x -> x < 1000) .forEach(System.out::println); // Not the primes till 1000, but 1000 first primes sieve(StreamEx.iterate(2, x -> x+1)).limit(1000).forEach(System.out::println); 

También hay una solución alternativa usando headTail y concatenación de predicados:

 public static StreamEx sieve(StreamEx input, IntPredicate isPrime) { return input.headTail((head, tail) -> isPrime.test(head) ? sieve(tail, isPrime.and(n -> n % head != 0)).prepend(head) : sieve(tail, isPrime)); } sieve(StreamEx.iterate(2, x -> x+1), i -> true).limit(1000).forEach(System.out::println); 

Es interesante comparar soluciones recursivas: cuántos primos son capaces de generar.

@John McClean solución ( StreamUtils )

Las soluciones de John McClean no son flojas: no puedes alimentarlas con flujo infinito. Así que acabo de encontrar por prueba y error el máximo límite máximo permitido ( 17793 ) (después de que se produce StackOverflowError):

 public void sieveTest(){ sieve(IntStream.range(2, 17793).boxed()).forEach(System.out::println); } 

@John McClean solution ( Streamable )

 public void sieveTest2(){ sieve(Streamable.range(2, 39990)).forEach(System.out::println); } 

El aumento del límite superior por encima de 39990 da 39990 resultado StackOverflowError.

@frhack solution ( LazySeq )

 LazySeq ints = integers(2); LazySeq primes = sieve(ints); // sieve method from @frhack answer primes.forEach(p -> System.out.println(p)); 

Resultado: atascado después del número primo = 53327 con una enorme asignación de montón y recolección de basura que toma más del 90%. Pasaron varios minutos para pasar de 53323 a 53327, por lo que esperar más parece poco práctico.

@vidi solución

 Prime.stream().forEach(System.out::println); 

Resultado: StackOverflowError después del número primo = 134417 .

Mi solución (StreamEx)

 sieve(StreamEx.iterate(2, x -> x+1)).forEach(System.out::println); 

Resultado: StackOverflowError después del número primo = 236167 .

@frhack solution ( rxjava )

 Observable primes = Observable.from(()->primesStream.iterator()); primes.forEach((x) -> System.out.println(x.toString())); 

Resultado: StackOverflowError después del número primo = 367663 .

@Holger solución

 IntStream primes=from(2).filter(i->p.test(i)).peek(i->p=p.and(v->v%i!=0)); primes.forEach(System.out::println); 

Resultado: StackOverflowError después del número primo = 368089 .

Mi solución (StreamEx con concatenación de predicados)

 sieve(StreamEx.iterate(2, x -> x+1), i -> true).forEach(System.out::println); 

Resultado: StackOverflowError después del número primo = 368287 .


Así que ganan tres soluciones que implican concatenación de predicados, porque cada condición nueva agrega solo 2 marcos de stack más. Creo que la diferencia entre ellos es marginal y no se debe considerar que define un ganador. Sin embargo, me gusta más mi primera solución StreamEx ya que es más similar al código Scala.

Si no te importa utilizar bibliotecas de terceros cyclops-streams , la biblioteca que escribí tiene una serie de posibles soluciones.

La clase StreamUtils tiene una gran cantidad de métodos estáticos para trabajar directamente con java.util.stream.Streams incluido headAndTail .

 HeadAndTail headAndTail = StreamUtils.headAndTail(Stream.of(1,2,3,4)); int head = headAndTail.head(); //1 Stream tail = headAndTail.tail(); //Stream[2,3,4] 

La clase Streamable representa una Stream rejugable y funciona mediante la creación de una estructura de datos intermedia en memoria caché. Debido a que es el almacenamiento en caché y reembolsable, cabe la posibilidad de implementar la cabecera y la cola directamente y por separado.

 Streamable replayable= Streamable.fromStream(Stream.of(1,2,3,4)); int head = repayable.head(); //1 Stream tail = replayable.tail(); //Stream[2,3,4] 

cyclops-streams también proporciona una extensión Stream secuencial que a su vez se extiende jOOλ y tiene soluciones basadas en Tuple (desde jOOλ) y objeto de dominio (HeadAndTail) para la extracción de cabeza y cola.

 SequenceM.of(1,2,3,4) .splitAtHead(); //Tuple[1,SequenceM[2,3,4] SequenceM.of(1,2,3,4) .headAndTail(); 

Actualización por solicitud de Tagir -> Una versión Java del tamiz Scala usando SequenceM

 public void sieveTest(){ sieve(SequenceM.range(2, 1_000)).forEach(System.out::println); } SequenceM sieve(SequenceM s){ return s.headAndTailOptional().map(ht ->SequenceM.of(ht.head()) .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0)))) .orElse(SequenceM.of()); } 

Y otra versión vía Streamable

 public void sieveTest2(){ sieve(Streamable.range(2, 1_000)).forEach(System.out::println); } Streamable sieve(Streamable s){ return s.size()==0? Streamable.of() : Streamable.of(s.head()) .appendStreamable(sieve(s.tail() .filter(n -> n % s.head() != 0))); } 

Nota: ni Streamable of SequenceM tiene una implementación vacía, de ahí la verificación de tamaño para Streamable y el uso de headAndTailOptional .

Finalmente una versión que usa java.util.stream.Stream simple

 import static com.aol.cyclops.streams.StreamUtils.headAndTailOptional; public void sieveTest(){ sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println); } Stream sieve(Stream s){ return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head()) ,sieve(ht.tail().filter(n -> n % ht.head() != 0)))) .orElse(Stream.of()); } 

Otra actualización: una iteración diferida basada en la versión de @ Holger que usa objetos en lugar de primitivos (tenga en cuenta que también es posible una versión primitiva)

  final Mutable> predicate = Mutable.of(x->true); SequenceM.iterate(2, n->n+1) .filter(i->predicate.get().test(i)) .peek(i->predicate.mutate(p-> p.and(v -> v%i!=0))) .limit(100000) .forEach(System.out::println); 

Para obtener cabeza y cola, necesita una implementación de Lazy Stream. Java 8 stream o RxJava no son adecuados.

Puede usar, por ejemplo, LazySeq de la siguiente manera.

La secuencia floja siempre se atraviesa desde el principio usando descomposición de primer / rest muy barata (cabeza () y cola ())

LazySeq implementa la interfaz java.util.List, por lo tanto, se puede usar en una variedad de lugares. Además, también implementa mejoras de Java 8 para colecciones, principalmente transmisiones y colectores


 package com.company; import com.nurkiewicz.lazyseq.LazySeq; public class Main { public static void main(String[] args) { LazySeq ints = integers(2); LazySeq primes = sieve(ints); primes.take(10).forEach(p -> System.out.println(p)); } private static LazySeq sieve(LazySeq s) { return LazySeq.cons(s.head(), () -> sieve(s.filter(x -> x % s.head() != 0))); } private static LazySeq integers(int from) { return LazySeq.cons(from, () -> integers(from + 1)); } } 

Aquí hay otra receta que usa la forma sugerida por Holger. Utiliza RxJava solo para agregar la posibilidad de usar el método take (int) y muchos otros.

 package com.company; import rx.Observable; import java.util.function.IntPredicate; import java.util.stream.IntStream; public class Main { public static void main(String[] args) { final IntPredicate[] p={(x)->true}; IntStream primesStream=IntStream.iterate(2,n->n+1).filter(i -> p[0].test(i)).peek(i->p[0]=p[0].and(v->v%i!=0) ); Observable primes = Observable.from(()->primesStream.iterator()); primes.take(10).forEach((x) -> System.out.println(x.toString())); } } 

Si quieres obtener la cabecera de una secuencia, simplemente:

 IntStream.range(1, 5).first(); 

Si quieres obtener la cola de una secuencia, simplemente:

 IntStream.range(1, 5).skip(1); 

Si quieres obtener la cabecera y la cola de una secuencia, simplemente:

 IntStream s = IntStream.range(1, 5); int head = s.head(); IntStream tail = s.tail(); 

Si quieres encontrar el mejor, simplemente:

 LongStream.range(2, n) .filter(i -> LongStream.range(2, (long) Math.sqrt(i) + 1).noneMatch(j -> i % j == 0)) .forEach(N::println); 

Si quieres saber más, ve a buscar AbacusUtil

Declaración: soy el desarrollador de AbacusUtil.