Seen provides async/await, parallel for loops, and synchronization primitives for concurrent programming.
Async functions are implemented using LLVM coroutines.
@async
fun fetchData(url: String) r: String {
let response = await httpGet(url)
return response.body
}
The @async decorator transforms the function into a coroutine that returns a handle (ptr).
@async
fun processAll() r: Int {
let a = await fetchData("https://api.example.com/a")
let b = await fetchData("https://api.example.com/b")
return a.length() + b.length()
}
fun main() {
let rt = new_async_runtime()
runtime_spawn(rt, processAll(), "main_task")
runtime_run_until_complete(rt)
}
Runtime methods:
new_async_runtime() -- create a runtimeruntime_spawn(rt, task, name) -- schedule a coroutineruntime_tick(rt) -- process one tickruntime_run_until_complete(rt) -- run until all tasks finishruntime_block_on_int(rt, task) -- block on a task returning Intruntime_block_on_void(rt, task) -- block on a void taskruntime_pending_count(rt) -- number of pending tasksruntime_stop(rt) -- stop the runtimeYield control back to the runtime from within an async function:
@async
fun longRunning() {
var i = 0
while i < 1000 {
doWork(i)
async_yield() // allow other tasks to run
i = i + 1
}
}
Structured concurrency with scopes:
let scope = new_async_scope()
scope_spawn(scope, task1(), "t1")
scope_spawn(scope, task2(), "t2")
scope_join(scope) // wait for all tasks
scope_cancel(scope) // or cancel all
Fork-based parallel iteration:
var results = Array<Int>.withLength(1000)
parallel_for i in 0..1000 {
results[i] = computeExpensive(i)
}
Each iteration may run in a separate forked process.
let mutex = Mutex.new()
mutex.lock()
// critical section
mutex.unlock()
Mutex.new() creates a real pthread mutex. Methods:
lock() -- acquire the lock (blocking)unlock() -- release the locktryLock() -- non-blocking attempt, returns BoolRead-write lock for concurrent reads:
let rwlock = RwLock.new()
// Multiple readers
rwlock.readLock()
let data = sharedData
rwlock.readUnlock()
// Exclusive writer
rwlock.writeLock()
sharedData = newValue
rwlock.writeUnlock()
Synchronize N threads at a rendezvous point:
let barrier = Barrier.new(4) // 4 threads must arrive
// In each thread:
barrier.wait() // blocks until all 4 arrive
Lock-free integer operations:
let counter = AtomicInt.new(0)
counter.store(42)
let val = counter.load()
counter.compareExchange(expected, desired)
Operations:
load() / load_relaxed() / load_acquire() -- read valuestore() / store_release() -- write valuecompareExchange(expected, desired) -- CAS operationMessage passing between threads (MPSC via Unix pipes):
let ch = Channel.new()
ch.send(42)
let value = ch.receive()
Per-thread storage:
let tls = ThreadLocal.new()
tls.set(42)
let value = tls.get()
@sendMark a type as safe to transfer between threads:
@send
class Message {
var data: String
}
@syncMark a type as safe to share between threads:
@sync
class SharedCounter {
var mutex: Mutex
var count: Int
}
let pool = WorkStealingPool.new(4) // 4 worker threads
pool.submit(taskFunction, arg)
pool.shutdown()
actor CounterActor {
var count: Int
receive Increment {
this.count = this.count + 1
}
receive GetCount {
reply this.count
}
}
