[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
futures in cscheme
You need the runtime file "future.scm". A version from an earlier
CScheme follows -- I've partially munged it to work with the 5.1 beta
release but there appear to be some remaining problems. Please send
me back a copy if you get it to work better; I don't have time to work
on it right now. I suspect that make-future needs to be changed to
use make-cheap-future but I'm not sure...
======================================================================
; This is -*- SCHEME -*- code
(declare (usual-integrations)
(compilable-primitive-functions
vector-set!
get-external-number
within-control-point))
(define put-work (make-primitive-procedure 'PUT-WORK))
(define global-interrupt (make-primitive-procedure 'GLOBAL-INTERRUPT))
(define touch (make-primitive-procedure 'TOUCH))
(define set-car-if-eq?! (make-primitive-procedure 'SET-CAR-IF-EQ?!))
(define set-cdr-if-eq?! (make-primitive-procedure 'SET-CDR-IF-EQ?!))
(define vector-set-if-eq?! (make-primitive-procedure 'VECTOR-SET-IF-EQ?!))
(define set-cxr-if-eq?! (make-primitive-procedure 'SET-CXR-IF-EQ?!))
(define future-ref (make-primitive-procedure 'FUTURE-REF))
(define future-set! (make-primitive-procedure 'FUTURE-SET!))
(define future-size (make-primitive-procedure 'FUTURE-SIZE))
(define lock-future! (make-primitive-procedure 'LOCK-FUTURE!))
(define unlock-future! (make-primitive-procedure 'UNLOCK-FUTURE!))
(define non-touching-eq? (make-primitive-procedure 'NON-TOUCHING-EQ?))
(define n-interpreters (make-primitive-procedure 'N-INTERPRETERS))
(define my-processor-number (make-primitive-procedure 'MY-PROCESSOR-NUMBER))
(define my-interpreter-number (make-primitive-procedure 'MY-INTERPRETER-NUMBER))
;; This is the stuff Anthony put here.
;; (define statistics-package
;; (make-environment
;; (define get-statistics
;; (make-primitive-procedure 'get-statistics #!true))
;; (define stat-names '((CONTENTION-COUNT . 0)
;; (GC-MASTER-IDLE-TIME . 1)
;; (GC-SLAVE-IDLE-TIME . 2)
;; (GC-LOOP-TIME . 3)
;; (GC-DAEMON-TIME . 4)))
;; (define This-load '())
;;
;; (define (load-statistics)
;; (set! this-load (get-statistics)))
;;
;; (define (get-statistic name)
;; (if (unassigned? this-load)
;; (load-statistics))
;; (vector-ref this-load (cdr (assq name stat-names))))
;;
;; (define (clear-statistics)
;; (set! this-load))))
;;
;; (define load-statistics (access load-statistics statistics-package))
;; (define get-statistic (access get-statistic statistics-package))
;; (define clear-statistics (access clear-statistics statistics-package))
;; Slots in a future
;; MICROCODE KNOWS ABOUT THESE
;; FUTURE-DETERMINED-SLOT is #!TRUE if the value is known and immutable
;; #!FALSE if not yet know, else known but mutable (i.e. KEEP-SLOT)
(define FUTURE-DETERMINED-SLOT 0)
;; FUTURE-LOCK-SLOT is #!TRUE if the future is locked by a process
(define FUTURE-LOCK-SLOT 1)
;; The next two are mutually exclusive. The VALUE is used if
;; DETERMINED is not #!FALSE. The QUEUE contains a WEAK queue of
;; processes waiting for a value to appear if DETERMINED is #!FALSE.
(define FUTURE-VALUE-SLOT 2)
(define FUTURE-QUEUE-SLOT 2)
; REFERENCED ONLY BY THE RUNTIME SYSTEM
;; Code to run to re-activate this process
(define FUTURE-PROCESS-SLOT 3)
;; The FUTURE-STATUS-SLOT contains one of:
;; RUNNING: Actually in possession of a processor
;; WAITING: Stopped waiting for the value of a future
;; PAUSED: Stopped by PAUSE-EVERYTHING
;; DELAYED: Created by delay scheduler and not yet run
;; RUNNABLE: Available for execution
;; DETERMINED: Value has been set and process is finished
;; CREATED: Future newly created
(define FUTURE-STATUS-SLOT 4)
;; For debugging purposes, the original thunk to be executed.
(define FUTURE-ORIG-CODE-SLOT 5)
(define FUTURE-PROCESS-PRIVATE-SLOT 6)
;; If this process has status WAITING this is a (strong) list of the
;; futures on which it is waiting.
(define FUTURE-WAITING-ON-SLOT 7)
;; For GC metering (not used by simulator)
(define FUTURE-METER-SLOT 8)
;; For users:
(define FUTURE-USER-SLOT 9)
; Some useful macros for dealing with atomicity. Notice that
;;DEFINE-MACRO happens when the text is turned into code (i.e.
;;at syntax time), while ADD-SYNTAX! happens only when the program
;;is actually executed. So both are used when this file uses the
;;macro, but only ADD-SYNTAX! is used for user macros which
;;are not referenced here.
(define-macro (add-syntax! name expander)
`(SYNTAX-TABLE-DEFINE SYSTEM-GLOBAL-SYNTAX-TABLE ,name ,expander))
; ATOMIC takes a list of expression and guarantees that they
;;are done without interrupts.
(define-macro (atomic . expressions)
`(WITHOUT-INTERRUPTS
(LAMBDA () . ,expressions)))
(add-syntax! 'ATOMIC
(macro expressions
`(WITHOUT-INTERRUPTS
(LAMBDA () . ,expressions))))
;; PROG1 is like the MACLISP macro of the same name
(define-macro (PROG1 . exprs)
`(LET ((FIRST ,(CAR exprs)))
,@(CDR exprs)
FIRST))
; DEFINE-ATOMIC is like the procedural version of DEFINE, except
;;that the body is wrapped in WITHOUT-INTERRUPTS.
(define-macro (define-atomic arg-template . body)
`(DEFINE ,arg-template (ATOMIC . ,body)))
(add-syntax! 'DEFINE-ATOMIC
(macro (arg-template . body)
`(DEFINE ,arg-template (ATOMIC . ,body))))
; LOCKING-FUTURE is the same as ATOMIC except that it also wraps
;;a LOCK-FUTURE! and UNLOCK-FUTURE! around the expression(s).
;;LOCKED? is a flag which can be used in BODY -- it will be #!true
;;if the future is still valid (you hang until you can lock it),
;;or #!false if it has been spliced out.
(define-macro (LOCKING-FUTURE FUTURE LOCKED? . BODY)
`(WITH-FUTURE-LOCKED ,future
(LAMBDA (,locked?) . ,body)))
(add-syntax! 'LOCKING-FUTURE
(macro (FUTURE LOCKED? . BODY)
`(WITH-FUTURE-LOCKED ,future
(LAMBDA (,locked?) . ,body))))
(define-macro (WITH-STATE STATE . BODY)
`(NON-REENTRANT-TASK-CATCH (LAMBDA (,state) . ,body)))
(DEFINE-ATOMIC (with-future-locked future thunk)
(if (lock-future! future)
(let ((result (thunk #!true)))
(unlock-future! future)
result)
(thunk #!false)))
(define scheduler
(make-environment
(declare (usual-integrations)
(compilable-primitive-functions
(weak-car system-pair-car)
(weak-cdr system-pair-cdr)
(weak-set-car! system-pair-set-car!)
(weak-set-cdr! system-pair-set-cdr!)))
(define control-point-type
(microcode-type 'CONTROL-POINT))
(define sti
(make-primitive-procedure 'setup-timer-interrupt #!true))
(define drain-work-queue!
(make-primitive-procedure 'drain-work-queue!))
(define weak-cons-type
(microcode-type 'WEAK-CONS))
(define non-reentrant-task-catch)
(define task-catch)
(define non-reentrant-call/cc
(make-primitive-procedure 'non-reentrant-call-with-current-continuation))
(define call/cc
(make-primitive-procedure 'call-with-current-continuation))
(define set-current-dynamic-state!
(make-primitive-procedure 'set-current-dynamic-state!))
(define catch-maker
(access catch-maker continuation-package))
(define current-Future-Vector) ; Process currently running
(define the-paused-tasks) ; Tasks being suspended temporarily
(define Start-Process) ; Default scheduler for FUTURE creation
(define Idle-Future) ; Future to wait until idle on
(define discard-the-paused-tasks? #!false) ; Throw away tasks?
(define preempting? #!false) ; No timer currently set
(define Delta '()) ; Scheduling frequency, centi-seconds
(set! non-reentrant-task-catch
(catch-maker non-reentrant-call/cc set-current-dynamic-state! #!true))
(set! task-catch
(catch-maker call/cc set-current-dynamic-state! #!false))
(define (legitimate-process? object)
(or (procedure? object) (primitive-type? control-point-type object)))
(DEFINE-ATOMIC (start-preempting interval)
(if (not preempting?)
(begin
(set! timer-interrupt
(lambda ()
(let ((My-Task (Current-Future)))
(WITH-STATE me
(LOCKING-FUTURE My-Task I-am-running?
(if I-am-running?
(begin
(future-set! My-Task FUTURE-PROCESS-SLOT me)
(more-work My-Task))
(begin
(stop-preempting)
(bkpt "TIMER: Existential crisis!"))))
(next)))))
(set! preempting? #!true)
(set! delta interval)
(sti 0 interval))
(display "Already preempting when START-PREEMPTING called")))
(DEFINE-ATOMIC (stop-preempting)
(sti '() '())
(set! preempting? #!false))
(define make-future
(let ((future-type (microcode-type 'FUTURE)))
(named-lambda (make-future orig-code user-procedure name)
(primitive-set-type
future-type
(vector #!false ; DETERMINED: No value yet
#!false ; LOCK: Not locked
(make-empty-queue)
; VALUE/QUEUE:No waiters
orig-code ; PROCESS: How to resume
'CREATED ; STATUS: Ready to go
user-procedure ; ORIG_CODE: For debugging
(if (unbound? open-console-channel)
name
(vector
(open-console-channel name)))
; PROCESS-PRIVATE: Butterfly??
'() ; WAITING-ON: Not waiting
0 ; METER: Ignored by simulator
'()))))) ; USER-SLOT
(define (more-work work)
(future-set! work FUTURE-STATUS-SLOT 'RUNNABLE)
(put-work work)
(if (and delta (not preempting?))
(start-preempting delta)))
(define spawn-process
(let ((make-initial-process
(make-primitive-procedure 'make-initial-process)))
(named-lambda (spawn-process thunk doc #!optional start)
(let ((object)
(dynamic-state (current-dynamic-state)))
(set! object
(make-future
(make-initial-process
(lambda ()
(set-current-dynamic-state! dynamic-state)
(thunk)))
thunk doc))
((if (unassigned? start) start-process start) object)
object))))
(DEFINE-ATOMIC (end-of-computation-handler expression environment value)
(let ((me (current-future)))
(determine! me value #!false))
(next))
(define (determine! future value #!optional keep-slot?)
;; AWAKEN! is called with a queue (of processes waiting
;; for a future) and promotes them all to runnable status.
(define (awaken! queue)
(let loop ()
(if (empty-queue? queue)
'DONE
(let ((next-item (dequeue! queue)))
(LOCKING-FUTURE next-item item-runnable?
(if (and
item-runnable?
(eq? (future-ref next-item FUTURE-STATUS-SLOT) 'WAITING)
(non-touching-memq future (future-ref next-item FUTURE-WAITING-ON-SLOT)))
(begin
(future-set! next-item FUTURE-WAITING-ON-SLOT future)
(more-work next-item))))
(loop)))))
(LOCKING-FUTURE future was-still-a-future?
(if was-still-a-future?
(let ((known? (future-ref future FUTURE-DETERMINED-SLOT))
(waiters (future-ref future FUTURE-QUEUE-SLOT)))
(if (eq? known? #!true)
(error "Future cannot be determined twice." future))
(future-set! future FUTURE-VALUE-SLOT value)
(future-set! future FUTURE-STATUS-SLOT 'DETERMINED)
(if (unassigned? keep-slot?)
(if (eq? known? #!false)
(future-set! future FUTURE-DETERMINED-SLOT #!true))
(future-set! future FUTURE-DETERMINED-SLOT
(if keep-slot? 'KEEP-SLOT #!true)))
(if (not known?) (awaken! waiters)))
(error "Future cannot be determined twice." future)))
value)
(define (Futures-On?) (not (unassigned? Current-Future-Vector)))
(define (Futures-Off)
(pause-everything)
(set! Current-Future-Vector)
'FUTURES-TURNED-OFF)
(define (Current-Future)
(if (Futures-On?)
(vector-ref Current-Future-Vector (My-Interpreter-Number))
'()))
(define (Set-Current-Future! Future)
(vector-set! Current-Future-Vector (My-Interpreter-Number)
Future))
(define (initialize-scheduler!
#!optional interval default-scheduler non-aborting?)
(let ((set-fixed-objects-vector!
(make-primitive-procedure 'set-fixed-objects-vector!)))
(pause-everything) ; Stop all processors & drain queue
(let ((termination-handlers
(vector-ref (get-fixed-objects-vector)
(fixed-objects-vector-slot
'MICROCODE-TERMINATIONS-PROCEDURES))))
(if (= (vector-length termination-handlers) 0)
(begin
(set! termination-handlers
(vector-cons number-of-microcode-terminations '()))
(vector-set! (get-fixed-objects-vector)
(fixed-objects-vector-slot
'MICROCODE-TERMINATIONS-PROCEDURES)
termination-handlers)
(set-fixed-objects-vector! (get-fixed-objects-vector))))
(vector-set! termination-handlers
(microcode-termination 'END-OF-CONTINUATION)
end-of-computation-handler))
(set! Start-Process
(if (unassigned? default-scheduler)
dfuture-scheduler
default-scheduler))
(set! Current-Future-Vector
(vector-cons (N-Interpreters) 'NO-FUTURE-YET))
(set! Idle-Future (make-future 'NO-PROCESS 'NO-PROCESS "Idle-Loop"))
;; INITIALIZE-SCHEDULER (futures-on) continues on the next page
;; INITIALIZE-SCHEDULER (futures-on), continued
(if (not (unassigned? interval)) (set! delta interval))
(let ((fobj (get-fixed-objects-vector)))
(vector-set! fobj (fixed-objects-vector-slot 'SCHEDULER)
await-future)
(set-fixed-objects-vector! fobj))
(Set-Current-Future!
(make-future 'INITIAL-PROCESS 'INITIAL-PROCESS "The Initial Process"))
(future-set! (current-future) FUTURE-STATUS-SLOT 'RUNNING)
(global-interrupt
1
(lambda (IntCode IntEnb)
(set-interrupt-enables! IntEnb)
(next))
(lambda () #!true))
(if (or (unbound? abort-to-top-level-driver)
(and (not (unassigned? non-aborting?))
non-aborting?))
(or Delta 'NOT-PREEMPTIVE-SCHEDULING)
(abort-to-top-level-driver
(cond ((unbound? format) "^G to restart the futures")
((not Delta) "^G: no preemptive scheduling")
((negative? Delta)
(format () "^G: scheduling ~o.~o~o (real) secs."
(quotient (abs Delta) 100)
(remainder (quotient (abs Delta) 10) 10)
(remainder (remainder (abs Delta) 10) 10)))
(else
(format () "^G: scheduling ~o.~o~o (runtime) secs."
(quotient Delta 100)
(remainder (quotient Delta 10) 10)
(remainder (remainder Delta 10) 10))))))))
; Scheduling support
(define (next)
(let ((get-work (make-primitive-procedure 'get-work)))
(Set-Current-Future! 'WAITING-FOR-WORK)
(run (get-work
(named-lambda (loop)
(stop-preempting)
(determine! Idle-Future 'DONE)
(set! Idle-Future
(make-future 'NO-PROCESS 'NO-PROCESS "Idle Loop"))
(run (get-work
(lambda ()
(error "No Work Available")
(loop)))))))))
;; RUN starts a process running
(define (run future)
((LOCKING-FUTURE future Still-A-Future?
(if Still-A-Future?
(let ((new-process
(future-set! future
FUTURE-PROCESS-SLOT (My-Interpreter-Number)))
(old-status (future-set! future FUTURE-STATUS-SLOT 'RUNNING)))
(if (and (legitimate-process? new-process)
(eq? old-status 'RUNNABLE))
(begin
(Set-Current-Future! future)
(if (and delta preempting?)
(sti 0 delta)) ; Full time interval
(lambda () (new-process 'YOUR-TURN)))
(begin
(future-set! future FUTURE-STATUS-SLOT old-status)
(future-set! future FUTURE-PROCESS-SLOT new-process)
next)))
next))))
;; AWAIT-FUTURE suspends the current process and adds it to the
;; queue waiting for the specified future to get a value. The
;; thunk, if specified, is executed immediately before going off for
;; more work to do (i.e. after all of the enqueuing work, etc. is done).
(DEFINE-ATOMIC (await-future future #!optional thunk)
(WITH-STATE me
(let
((perform-normal-return?
(LOCKING-FUTURE future waiting-for-a-future?
(if (or (not waiting-for-a-future?) ; Already determined
(future-ref future FUTURE-DETERMINED-SLOT))
#!TRUE ; Return normally
(let ((My-Task (Current-Future))
(status (future-ref future FUTURE-STATUS-SLOT)))
(if (or (eq? status 'DELAYED) (eq? status 'PAUSED))
(more-work future))
(LOCKING-FUTURE My-Task I-am-running?
(if I-am-running?
(begin
(future-set! My-Task FUTURE-PROCESS-SLOT me)
(future-set! My-Task FUTURE-STATUS-SLOT 'WAITING)
(future-set! My-Task FUTURE-WAITING-ON-SLOT
(list future))
(enqueue! (future-ref future FUTURE-QUEUE-SLOT) My-Task))
(display "AWAIT-FUTURE: Existential crisis!")))
#!FALSE))))) ; Don't return normally
(if (not (unassigned? thunk)) (thunk)) ; Do the optional work
(if perform-normal-return? ; Done or find more work
(me 'DONE)
(next)))))
;; AWAIT-FUTURE-AFTER-ACTION suspends the current process after
;; executing a thunk and adds it to the queue waiting for the
;; specified future to get a value. Its purpose is to ensure
;; that the process is actually on the wait queue of the future
;; when the action takes place. This prevents a race condition
;; which might cause problems if the action is intended to determine
;; the future (e.g., externally) and wake up the process. In other words,
;; if the process has not been added to the wait queue when another
;; process or an external interrupt determines the future, the event
;; would not wake up the process as intended.
;;
;; Since this is rather specialized code (it was added to support
;; the new console i/o system), many safeguards of the normal
;; AWAIT-FUTURE are removed for speed. Normally the future will
;; have been explicitly constructed by the user and is not being
;; determined by any other process.
(define (await-future-after-action future action)
(await-future future action))
(define await-internal
(let ((fall-through-tag (cons 'FALL-THROUGH '())))
(named-lambda (await-internal My-Task futures)
(let ((Value-Known? #!false))
(define (announce-value value)
(if (not (set! Value-Known? #!true))
(determine! My-Task value)))
(define (spawn-processes disjuncts)
(if (null? disjuncts)
'DONE
(let ((this-future (car disjuncts)))
(if (LOCKING-FUTURE this-future really-a-future?
(if (and really-a-future?
(null?
(future-ref
this-future FUTURE-DETERMINED-SLOT)))
(begin
(enqueue!
(future-ref this-future FUTURE-QUEUE-SLOT)
My-Task)
#!true)
(begin
(announce-value this-future)
#!false)))
(spawn-processes (cdr disjuncts))))))
(if (eq? fall-through-tag
(task-catch (lambda (me) ; Deliberately re-entrant
(future-set! My-Task FUTURE-PROCESS-SLOT me)
(future-set! My-Task FUTURE-STATUS-SLOT 'WAITING)
(future-set! My-Task FUTURE-WAITING-ON-SLOT futures)
(spawn-processes futures)
fall-through-tag)))
My-Task
(begin
(announce-value (future-ref My-Task FUTURE-WAITING-ON-SLOT))
(next)))))))
(define (disjoin . futures)
(await-internal (make-future 'DISJOIN 'DISJOIN "Disjoin") futures))
(define (await-first-of futures)
(await-internal (make-future 'DISJOIN 'DISJOIN "Disjoin") futures))
; Special scheduler operations
;; RESCHEDULE allows me to give up my processor slice and
;; wait until the scheduler gets back to me.
(define-atomic (reschedule)
(let ((my-task (current-future)))
(WITH-STATE me
(if (LOCKING-FUTURE my-task am-I-running?
(if am-I-running?
(begin
(future-set! my-task FUTURE-PROCESS-SLOT me)
(more-work my-task)))
am-I-running?)
(next)
'NOT-CURRENTLY-RUNNING-A-FUTURE))))
;; WAIT-UNTIL-IDLE causes a process to just continue
;; going to sleep until there are no other active processes.
(define (wait-until-idle) (touch idle-future))
;; DFUTURE-SCHEDULER is a future creation scheduler which
;; defers the child process and continues on with the parent.
;; Note that all creation schedulers are called as part of
;; the parent process, so this is the easy case.
(DEFINE-ATOMIC (dfuture-scheduler future)
(more-work future)
'CHILD-QUEUED-FOR-EXECUTION)
;; FUTURE-SCHEDULER is a future creation scheduler which
;; defers the parent process and continues on with the child.
;; This is a little harder than DFUTURE, since it is called
;; running as the parent.
(DEFINE-ATOMIC (future-scheduler future)
(WITH-STATE parent-process
(let ((My-Future (Current-Future)))
(LOCKING-FUTURE My-Future Still-Runnable?
(if Still-Runnable?
(begin
(future-set! My-Future FUTURE-PROCESS-SLOT parent-process)
(more-work My-Future))))))
(more-work future)
(run future))
;; DELAY-SCHEDULER is a future creation scheduler which defers
;; execution of the newly created future until it is first
;; touched.
(DEFINE-ATOMIC (delay-scheduler future)
(future-set! future FUTURE-STATUS-SLOT 'DELAYED)
'OK-I-DELAYED-IT)
;; Queue Abstraction
;;
;; -------------------------------
;; | Tail Pointer | Head Pointer |
;; -------------------------------
;; | |
;; | |
;; V V
;; ----- ----- -----
;; | |=|=>| |=|=>| |/| add new items by clobbering '()
;; ----- ----- -----
;; remove from start of list
;; (The list itself is made from WEAK cons cells)
;;
;; The queue is empty when Tail=Head=#!NULL
;; (thus it has one item when Tail=Head but they are not #!NULL)
;;
;; These operations assume that the caller has arranged for any
;; desired atomicity.
(define (weak-cons a b)
(system-pair-cons weak-cons-type a b))
(define weak-car system-pair-car)
(define weak-cdr system-pair-cdr)
(define weak-set-car! system-pair-set-car!)
(define weak-set-cdr! system-pair-set-cdr!)
(define (make-empty-queue) (cons '() ()))
(define queue-head-ptr car)
(define queue-tail-ptr cdr)
(define set-queue-head-ptr! set-car!)
(define set-queue-tail-ptr! set-cdr!)
(define (empty-queue? queue) (null? (queue-head-ptr queue)))
(define (enqueue! queue object)
(if (null? (queue-head-ptr queue))
(begin
(set-queue-head-ptr! queue (weak-cons object '()))
(set-queue-tail-ptr! queue (queue-head-ptr queue)))
(begin
(weak-set-cdr! (queue-head-ptr queue) (weak-cons object '()))
(set-queue-head-ptr! queue (weak-cdr (queue-head-ptr queue))))))
(define (dequeue! queue)
(let ((current-tail (queue-tail-ptr queue)))
(if (null? current-tail)
(error "Queue empty" queue)
(let ((result (weak-car current-tail)))
(if (null? (weak-cdr current-tail))
(begin (set-queue-head-ptr! queue '())
(set-queue-tail-ptr! queue '()))
(set-queue-tail-ptr! queue (weak-cdr current-tail)))
result))))
;; SAVING-STATE wraps up the current state of the system into the
;; current future and returns it to the work queue. It then executes
;; the thunk. If the current future is invoked the call to
;; SAVING-STATE is exitted; when the thunk returns, the processor will
;; wait for new work to perform.
(define (saving-state thunk)
(WITH-STATE my-state
(let ((my-future (current-future)))
(LOCKING-FUTURE my-future am-I-running?
(if am-I-running?
(begin
(future-set! my-future FUTURE-PROCESS-SLOT my-state)
(future-set! my-future FUTURE-STATUS-SLOT 'RUNNABLE)
(put-work my-future)))))
(set-current-future! 'STATE-SAVED)
(set! my-state)
(within-control-point the-error-continuation
(lambda ()
(thunk)
(next))))
'COMPLETED)
;; PAUSE-EVERYTHING is used to make every processor but the caller
;; save its state and go quiescent. The value returned by
;; Pause-Everything is a procedure which will put the work queue
;; back to its initial state (modulo order of futures on the queue).
(DEFINE-ATOMIC (pause-everything)
;; RELEASE-STATE! takes a list of futures and puts them
;; on the work queue.
(define (release-state! list)
(if (null? list)
'RESTARTED
(let ((work-unit (car list)))
(LOCKING-FUTURE work-unit work-to-do?
(if (and work-to-do?
(legitimate-process?
(future-ref work-unit FUTURE-PROCESS-SLOT))
(eq? (future-ref work-unit FUTURE-STATUS-SLOT)
'PAUSED))
(more-work work-unit)))
(release-state! (cdr list)))))
;; WEAK-LIST->LIST! takes a weak list of futures, as
;; returned by DRAIN-WORK-QUEUE! and converts it to a list of
;; the objects referenced. The GC code needs the weak form,
;; hence the extra work here. In the process, each future is
;; made to be PAUSED so it will automatically resume if touched
(define (weak-list->list! weak-list)
(let loop ((current weak-list)
(result '()))
(if (null? current)
result
(let ((work-unit (weak-car current)))
(LOCKING-FUTURE work-unit work-to-do?
(if work-to-do?
(begin
(future-set! work-unit FUTURE-STATUS-SLOT 'PAUSED)
(loop (weak-cdr current)
(cons work-unit result)))
(loop (weak-cdr current) result)))))))
(define ((returned-object the-queue) #!optional message)
(if (unassigned? message) (set! message 'Restart-tasks))
(cond ((eq? message 'Any-Tasks?)
(and (not (eq? the-queue #!true))
(not (null? the-queue))))
((eq? message 'Restart-tasks)
(if (not (eq? the-queue #!true))
(release-state! the-queue)
(error "Attempt to re-use a pause object!"))
(set! the-queue #!true))
((eq? message 'The-Tasks)
(if (eq? the-queue #!true)
'()
the-queue))
(else (error "Pause object: strange message" message))))
(if (not (Futures-On?))
(returned-object '())
(let ((save-synch (make-synchronizer))
(drain-synch (make-synchronizer))
(proceed-synch (make-synchronizer)))
(stop-preempting)
(global-interrupt
1
(lambda (int-code int-mask)
(await-synchrony save-synch)
(saving-state
(lambda ()
(set-interrupt-enables! int-mask)
(await-synchrony drain-synch)
(await-synchrony proceed-synch))))
(lambda () #!TRUE))
(await-synchrony save-synch)
(await-synchrony drain-synch)
(let ((me (current-future))
(the-queue (weak-list->list! (drain-work-queue!))))
(set! Current-Future-Vector (vector-cons (N-Interpreters) 'PAUSED))
(Set-Current-Future! me)
(await-synchrony proceed-synch)
(returned-object the-queue)))))
;; WITH-TASKS-SUSPENDED executes the thunk with all other processes
;; stopped. It returns the value of the thunk.
(define (with-tasks-suspended thunk)
(if (not (Futures-On?))
(thunk)
(fluid-let ((the-paused-tasks (pause-everything)))
(dynamic-wind
(lambda ()
(if (the-paused-tasks 'any-tasks?)
(begin
(newline)
(display "[Suspending tasks]"))))
thunk
(lambda ()
(cond ((not (the-paused-tasks 'any-tasks?)) '())
(discard-the-paused-tasks?
(newline) (display "[Discarding tasks]") (newline))
(else
(newline) (display "[Resuming tasks]") (newline)
(the-paused-tasks 'Restart-tasks))))))))
;; Dealing with recently suspended tasks
(define (discard-recently-suspended-tasks!)
(set! discard-the-paused-tasks? #!true))
(define (prevent-discarding-processes!)
(set! discard-the-paused-tasks? #!false))
;; Execution within a selected task
(define (within-process future thunk)
(define (loop noisy?)
((LOCKING-FUTURE future true-future?
(if true-future?
(let ((status (future-ref future FUTURE-STATUS-SLOT))
(process (future-ref future FUTURE-PROCESS-SLOT)))
(cond ((non-touching-eq? future (current-future))
thunk)
((eq? status 'RUNNING)
(lambda ()
(if noisy?
(bkpt "WITHIN-PROCESS: process is running"))
(loop #!false)))
(else
(future-set! future FUTURE-PROCESS-SLOT
(lambda (arg) (thunk) (process 'go)))
(more-work future)
(lambda () (run future)))))
(begin
(error "WITHIN-PROCESS: Not a process" future)
(lambda () (next)))))))
(loop #!true))
)) ; end of Make-Environment for Scheduler
; Export definitions to the world outside the scheduler
(define initialize-scheduler! (access initialize-scheduler! scheduler))
(define determine! (access determine! scheduler))
(define future-scheduler (access future-scheduler scheduler))
(define dfuture-scheduler (access dfuture-scheduler scheduler))
(define delay-scheduler (access delay-scheduler scheduler))
(define next (access reschedule scheduler))
(define wait-until-idle (access wait-until-idle scheduler))
(define pause-everything (access pause-everything scheduler))
(define with-tasks-suspended (access with-tasks-suspended scheduler))
(define discard-recently-suspended-tasks!
(access discard-recently-suspended-tasks! scheduler))
(define prevent-discarding-processes!
(access prevent-discarding-processes! scheduler))
(define Current-Future (access Current-Future scheduler))
(define Futures-On? (access Futures-On? scheduler))
(define Futures-Off (access Futures-Off scheduler))
(define Saving-State (access Saving-State scheduler))
(define within-process (access within-process scheduler))
(define Disjoin (access disjoin scheduler))
(define Await-First-Of (access await-first-of scheduler))
(define (futures-on #!optional slice)
(if (unassigned? slice) (set! slice '()))
(initialize-scheduler! slice dfuture-scheduler))
(define (non-touching-memq element list)
(cond ((null? list) #!false)
((non-touching-eq? element (car list)) list)
(else (non-touching-memq element (cdr list)))))
(define (non-touching-assq element list)
(cond ((null? list) #!false)
((non-touching-eq? element (caar list)) (car list))
(else (non-touching-assq element (cdr list)))))
(add-syntax! 'future
(macro (expression #!optional doc user-scheduler)
`((ACCESS SPAWN-PROCESS SCHEDULER)
(LAMBDA () ,expression) ; Work to do
,(if (unassigned? doc) ; Documentation
(with-output-to-string
(lambda () (display expression)))
doc)
,@(if (unassigned? user-scheduler) ; Start-up procedure
'()
`(,user-scheduler)))))
(futures-on)