teohhanhui/callbag-rs
{ "createdAt": "2021-11-03T14:58:50Z", "defaultBranch": "main", "description": "Rust implementation of the callbag spec for reactive/iterable programming", "fullName": "teohhanhui/callbag-rs", "homepage": "", "language": "Rust", "name": "callbag-rs", "pushedAt": "2022-05-11T14:03:39Z", "stargazersCount": 30, "topics": [ "callbag", "frp", "iterable", "observable", "reactive", "rust", "stream" ], "updatedAt": "2023-08-04T09:57:59Z", "url": "https://github.com/teohhanhui/callbag-rs"}callbag-rs
Section titled “callbag-rs”Rust implementation of the [callbag spec][callbag-spec] for reactive/iterable programming.
Basic [callbag][callbag-spec] factories and operators to get started with.
Highlights:
- Supports reactive stream programming
- Supports iterable programming (also!)
- Same operator works for both of the above
- Extensible
Imagine a hybrid between an [Observable][tc39-observable] and an [(Async)Iterable][tc39-async-iteration], that’s what callbags are all about. It’s all done with a few simple callbacks, following the [callbag spec][callbag-spec].
[![CI][ci-badge]][ci-url] [![Crates.io][crates-badge]][crates-url] [![Documentation][docs-badge]][docs-url] [![MIT OR Apache-2.0 licensed][license-badge]][license-url]
Examples
Section titled “Examples”Reactive programming examples
Section titled “Reactive programming examples”Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:
use async_nursery::Nursery;use crossbeam_queue::SegQueue;use std::{sync::Arc, time::Duration};
use callbag::{filter, for_each, interval, map, pipe, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
pipe!( interval(Duration::from_millis(1_000), nursery.clone()), map(|x| x + 1), filter(|x| x % 2 == 1), take(5), for_each({ let actual = Arc::clone(&actual); move |x| { println!("{x}"); actual.push(x); } }),);
drop(nursery);async_std::task::block_on(nursery_out);
assert_eq!( &{ let mut v = vec![]; while let Some(x) = actual.pop() { v.push(x); } v }[..], [1, 3, 5, 7, 9]);Iterable programming examples
Section titled “Iterable programming examples”From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:
use crossbeam_queue::SegQueue;use std::sync::Arc;
use callbag::{for_each, from_iter, map, pipe, take};
#[derive(Clone)]struct Range { i: usize, to: usize,}
impl Range { fn new(from: usize, to: usize) -> Self { Range { i: from, to } }}
impl Iterator for Range { type Item = usize;
fn next(&mut self) -> Option<Self::Item> { let i = self.i; if i <= self.to { self.i += 1; Some(i) } else { None } }}
let actual = Arc::new(SegQueue::new());
pipe!( from_iter(Range::new(40, 99)), take(5), map(|x| x as f64 / 4.0), for_each({ let actual = Arc::clone(&actual); move |x| { println!("{x}"); actual.push(x); } }),);
assert_eq!( &{ let mut v = vec![]; while let Some(x) = actual.pop() { v.push(x); } v }[..], [10.0, 10.25, 10.5, 10.75, 11.0]);
Ok::<(), Box<dyn std::error::Error>>(())The list below shows what’s included.
Source factories
Section titled “Source factories”Sink factories
Section titled “Sink factories”Transformation operators
Section titled “Transformation operators”Filtering operators
Section titled “Filtering operators”Combination operators
Section titled “Combination operators”Utilities
Section titled “Utilities”Terminology
Section titled “Terminology”- source: a callbag that delivers data
- sink: a callbag that receives data
- puller sink: a sink that actively requests data from the source
- pullable source: a source that delivers data only on demand (on receiving a request)
- listener sink: a sink that passively receives data from the source
- listenable source: source which sends data to the sink without waiting for requests
- operator: a callbag based on another callbag which applies some operation
License
Section titled “License”Licensed under either of
- Apache License, Version 2.0, ([LICENSE-APACHE]!(LICENSE-APACHE) or https://www.apache.org/licenses/LICENSE-2.0)
- MIT license ([LICENSE-MIT]!(LICENSE-MIT) or https://opensource.org/licenses/MIT)
at your option.
Contribution
Section titled “Contribution”Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
Acknowledgements
Section titled “Acknowledgements”Thanks to André Staltz (@staltz) for creating the [callbag spec][callbag-spec].
This library is a port of https://github.com/staltz/callbag-basics. Some inspiration was taken from https://github.com/f5io/callbag.rs.
Many thanks to the awesome folks on the Rust Users Forum for their help, especially:
[callbag-spec] !: https://github.com/callbag/callbag [ci-badge] !: https://github.com/teohhanhui/callbag-rs/actions/workflows/ci.yml/badge.svg [ci-url] !: https://github.com/teohhanhui/callbag-rs/actions/workflows/ci.yml [crates-badge] !: https://img.shields.io/crates/v/callbag [crates-url] !: https://crates.io/crates/callbag [docs-badge] !: https://img.shields.io/docsrs/callbag [docs-url] !: https://docs.rs/callbag [license-badge] !: https://img.shields.io/crates/l/callbag [license-url] !: https://github.com/teohhanhui/callbag-rs#license [tc39-async-iteration] !: https://github.com/tc39/proposal-async-iteration [tc39-observable] !: https://github.com/tc39/proposal-observable