zio-archive/interop-reactive-streams
{ "createdAt": "2019-06-09T15:53:25Z", "defaultBranch": "master", "description": "Interoperability Layer Between ZIO and Reactive Streams", "fullName": "zio-archive/interop-reactive-streams", "homepage": "https://zio.dev/zio-interop-reactivestreams", "language": "Scala", "name": "interop-reactive-streams", "pushedAt": "2024-05-30T06:40:55Z", "stargazersCount": 47, "topics": [ "reactive-streams", "scala", "zio" ], "updatedAt": "2025-01-20T13:02:38Z", "url": "https://github.com/zio-archive/interop-reactive-streams"}[//] !: # (This file was autogenerated using zio-sbt-website plugin via sbt generateReadme command.)
[//] !: # (So please do not edit it manually. Instead, change “docs/index.md” file or sbt setting keys)
[//] !: # (e.g. “readmeDocumentation” and “readmeSupport”.)
ZIO Interop Reactive Streams
Section titled “ZIO Interop Reactive Streams”This library provides an interoperability layer between ZIO and reactive streams.
Introduction
Section titled “Introduction”ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream to org.reactivestreams.Publisher and from zio.stream.Sink to org.reactivestreams.Subscriber and vice versa. Simply import import zio.interop.reactivestreams._ to make the conversions available.
Installation
Section titled “Installation”In order to use this library, we need to add the following line in our build.sbt file:
libraryDependencies += "dev.zio" %% "zio-interop-reactivestreams" % "2.0.2"Examples
Section titled “Examples”First, let’s get a few imports out of the way.
import org.reactivestreams.example.unicast._import zio._import zio.interop.reactivestreams._import zio.stream._We use the following Publisher and Subscriber for the examples:
val publisher = new RangePublisher(3, 10)val subscriber = new SyncSubscriber[Int] { override protected def whenNext(v: Int): Boolean = { print(s"$v, ") true }}Publisher to Stream
Section titled “Publisher to Stream”A Publisher used as a Stream buffers up to qSize elements. If possible, qSize should be
a power of two for best performance. The default is 16.
val streamFromPublisher = publisher.toZIOStream(qSize = 16)streamFromPublisher.run(Sink.collectAll[Integer])Subscriber to Sink
Section titled “Subscriber to Sink”When running a Stream to a Subscriber, a side channel is needed for signalling failures.
For this reason toZIOSink returns a tuple of a callback and a Sink. The callback must be used to signal Stream failure. The type parameter on toZIOSink is the error type of the Stream.
val asSink = subscriber.toZIOSink[Throwable]val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))ZIO.scoped { asSink.flatMap { case (signalError, sink) => // FIXME failingStream.run(sink).catchAll(signalError) }}Stream to Publisher
Section titled “Stream to Publisher”val stream = Stream.range(3, 13)stream.toPublisher.flatMap { publisher => UIO(publisher.subscribe(subscriber))}Sink to Subscriber
Section titled “Sink to Subscriber”toSubscriber returns a Subscriber and an IO which completes with the result of running the Sink or the error if the Publisher fails.
A Sink used as a Subscriber buffers up to qSize elements. If possible, qSize should be a power of two for best performance. The default is 16.
val sink = Sink.collectAll[Integer]ZIO.scoped { sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) => UIO(publisher.subscribe(subscriber)) *> result }}Documentation
Section titled “Documentation”Learn more on the ZIO Interop Reactive Streams homepage!
Contributing
Section titled “Contributing”For the general guidelines, see ZIO contributor’s guide.
Code of Conduct
Section titled “Code of Conduct”See the Code of Conduct
Support
Section titled “Support”Come chat with us on [![Badge-Discord]][Link-Discord].
[Badge-Discord] !: https://img.shields.io/discord/629491597070827530?logo=discord “chat on discord” [Link-Discord] !: https://discord.gg/2ccFBr4 “Discord”
License
Section titled “License”[License]!(LICENSE)