[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

futures in cscheme



You need the runtime file "future.scm".  A version from an earlier
CScheme follows -- I've partially munged it to work with the 5.1 beta
release but there appear to be some remaining problems.  Please send
me back a copy if you get it to work better; I don't have time to work
on it right now.  I suspect that make-future needs to be changed to
use make-cheap-future but I'm not sure...

======================================================================
; This is -*- SCHEME -*- code

(declare (usual-integrations)
	 (compilable-primitive-functions
	  vector-set!
	  get-external-number
	  within-control-point))

(define put-work (make-primitive-procedure 'PUT-WORK))
(define global-interrupt (make-primitive-procedure 'GLOBAL-INTERRUPT))

(define touch (make-primitive-procedure 'TOUCH))
(define set-car-if-eq?! (make-primitive-procedure 'SET-CAR-IF-EQ?!))
(define set-cdr-if-eq?! (make-primitive-procedure 'SET-CDR-IF-EQ?!))
(define vector-set-if-eq?! (make-primitive-procedure 'VECTOR-SET-IF-EQ?!))
(define set-cxr-if-eq?! (make-primitive-procedure 'SET-CXR-IF-EQ?!))

(define future-ref (make-primitive-procedure 'FUTURE-REF))
(define future-set! (make-primitive-procedure 'FUTURE-SET!))
(define future-size (make-primitive-procedure 'FUTURE-SIZE))
(define lock-future! (make-primitive-procedure 'LOCK-FUTURE!))
(define unlock-future! (make-primitive-procedure 'UNLOCK-FUTURE!))

(define non-touching-eq? (make-primitive-procedure 'NON-TOUCHING-EQ?))

(define n-interpreters (make-primitive-procedure 'N-INTERPRETERS))
(define my-processor-number (make-primitive-procedure 'MY-PROCESSOR-NUMBER))
(define my-interpreter-number (make-primitive-procedure 'MY-INTERPRETER-NUMBER))

;;	This is the stuff Anthony put here.

;; (define statistics-package
;;   (make-environment
;;     (define get-statistics
;; 	 (make-primitive-procedure 'get-statistics #!true))
;;     (define stat-names '((CONTENTION-COUNT      . 0)
;; 	 	            (GC-MASTER-IDLE-TIME   . 1)
;; 		            (GC-SLAVE-IDLE-TIME    . 2)
;; 		            (GC-LOOP-TIME          . 3)
;; 		            (GC-DAEMON-TIME        . 4)))
;;     (define This-load '())
;; 
;;     (define (load-statistics)
;;       (set! this-load (get-statistics)))
;;    
;;     (define (get-statistic name)
;;       (if (unassigned? this-load)
;; 	  (load-statistics))
;;       (vector-ref this-load (cdr (assq name stat-names))))
;; 
;;     (define (clear-statistics)
;;       (set! this-load))))
;; 
;; (define load-statistics (access load-statistics statistics-package))
;; (define get-statistic (access get-statistic statistics-package))
;; (define clear-statistics (access clear-statistics statistics-package))

;; Slots in a future

;; MICROCODE KNOWS ABOUT THESE

;; FUTURE-DETERMINED-SLOT is #!TRUE if the value is known and immutable
;; #!FALSE if not yet know, else known but mutable (i.e. KEEP-SLOT) 
(define FUTURE-DETERMINED-SLOT 0)

;; FUTURE-LOCK-SLOT is #!TRUE if the future is locked by a process
(define FUTURE-LOCK-SLOT 1)

;; The next two are mutually exclusive.  The VALUE is used if
;; DETERMINED is not #!FALSE. The QUEUE contains a WEAK queue of
;; processes waiting for a value to appear if DETERMINED is #!FALSE.
(define FUTURE-VALUE-SLOT 2)
(define FUTURE-QUEUE-SLOT 2)

; REFERENCED ONLY BY THE RUNTIME SYSTEM

;; Code to run to re-activate this process
(define FUTURE-PROCESS-SLOT 3)

;; The FUTURE-STATUS-SLOT contains one of:
;;   RUNNING:    Actually in possession of a processor
;;   WAITING:    Stopped waiting for the value of a future
;;   PAUSED:     Stopped by PAUSE-EVERYTHING
;;   DELAYED:    Created by delay scheduler and not yet run
;;   RUNNABLE:   Available for execution
;;   DETERMINED: Value has been set and process is finished
;;   CREATED:    Future newly created
(define FUTURE-STATUS-SLOT 4)

;; For debugging purposes, the original thunk to be executed.
(define FUTURE-ORIG-CODE-SLOT 5)

(define FUTURE-PROCESS-PRIVATE-SLOT 6)

;; If this process has status WAITING this is a (strong) list of the
;; futures on which it is waiting.
(define FUTURE-WAITING-ON-SLOT 7)

;; For GC metering (not used by simulator)
(define FUTURE-METER-SLOT 8)

;; For users:
(define FUTURE-USER-SLOT 9)

; Some useful macros for dealing with atomicity.  Notice that
;;DEFINE-MACRO happens when the text is turned into code (i.e.
;;at syntax time), while ADD-SYNTAX! happens only when the program
;;is actually executed.  So both are used when this file uses the
;;macro, but only ADD-SYNTAX! is used for user macros which
;;are not referenced here.

(define-macro (add-syntax! name expander)
  `(SYNTAX-TABLE-DEFINE SYSTEM-GLOBAL-SYNTAX-TABLE ,name ,expander))

; ATOMIC takes a list of expression and guarantees that they
;;are done without interrupts.

(define-macro (atomic . expressions)
  `(WITHOUT-INTERRUPTS
    (LAMBDA () . ,expressions)))

(add-syntax! 'ATOMIC
  (macro expressions
  `(WITHOUT-INTERRUPTS
    (LAMBDA () . ,expressions))))

;; PROG1 is like the MACLISP macro of the same name
(define-macro (PROG1 . exprs)
  `(LET ((FIRST ,(CAR exprs)))
     ,@(CDR exprs)
     FIRST))

; DEFINE-ATOMIC is like the procedural version of DEFINE, except
;;that the body is wrapped in WITHOUT-INTERRUPTS.

(define-macro (define-atomic arg-template . body)
  `(DEFINE ,arg-template (ATOMIC . ,body)))

(add-syntax! 'DEFINE-ATOMIC
  (macro (arg-template . body)
    `(DEFINE ,arg-template (ATOMIC . ,body))))

; LOCKING-FUTURE is the same as ATOMIC except that it also wraps
;;a LOCK-FUTURE! and UNLOCK-FUTURE! around the expression(s).
;;LOCKED? is a flag which can be used in BODY -- it will be #!true
;;if the future is still valid (you hang until you can lock it),
;;or #!false if it has been spliced out.

(define-macro (LOCKING-FUTURE FUTURE LOCKED? . BODY)
  `(WITH-FUTURE-LOCKED ,future
      (LAMBDA (,locked?) . ,body)))

(add-syntax! 'LOCKING-FUTURE
  (macro (FUTURE LOCKED? . BODY)
    `(WITH-FUTURE-LOCKED ,future
       (LAMBDA (,locked?) . ,body))))

(define-macro (WITH-STATE STATE . BODY)
  `(NON-REENTRANT-TASK-CATCH (LAMBDA (,state) . ,body)))

(DEFINE-ATOMIC (with-future-locked future thunk)
  (if (lock-future! future)
      (let ((result (thunk #!true)))
	(unlock-future! future)
	result)
      (thunk #!false)))


(define scheduler
  (make-environment
    (declare (usual-integrations)
	     (compilable-primitive-functions
	      (weak-car system-pair-car)
	      (weak-cdr system-pair-cdr)
	      (weak-set-car! system-pair-set-car!)
	      (weak-set-cdr! system-pair-set-cdr!)))

    (define control-point-type
      (microcode-type 'CONTROL-POINT))

    (define sti
      (make-primitive-procedure 'setup-timer-interrupt #!true))

    (define drain-work-queue!
      (make-primitive-procedure 'drain-work-queue!))

    (define weak-cons-type
      (microcode-type 'WEAK-CONS))

    (define non-reentrant-task-catch)

    (define task-catch)

    (define non-reentrant-call/cc
      (make-primitive-procedure 'non-reentrant-call-with-current-continuation))

    (define call/cc
      (make-primitive-procedure 'call-with-current-continuation))

    (define set-current-dynamic-state!
      (make-primitive-procedure 'set-current-dynamic-state!))

    (define catch-maker
      (access catch-maker continuation-package))

    (define current-Future-Vector)	; Process currently running
    (define the-paused-tasks)		; Tasks being suspended temporarily
    (define Start-Process)		; Default scheduler for FUTURE creation
    (define Idle-Future)		; Future to wait until idle on
    (define discard-the-paused-tasks? #!false) ; Throw away tasks?
    (define preempting? #!false)	; No timer currently set
    (define Delta '())			; Scheduling frequency, centi-seconds

    
    (set! non-reentrant-task-catch
	  (catch-maker non-reentrant-call/cc set-current-dynamic-state! #!true))

    (set! task-catch
	  (catch-maker call/cc set-current-dynamic-state! #!false))

    (define (legitimate-process? object)
      (or (procedure? object) (primitive-type? control-point-type object)))
    
    (DEFINE-ATOMIC (start-preempting interval)
      (if (not preempting?)
	  (begin
	    (set! timer-interrupt
		  (lambda ()
		    (let ((My-Task (Current-Future)))
		      (WITH-STATE me
			(LOCKING-FUTURE My-Task I-am-running?
			  (if I-am-running?
			      (begin
				(future-set! My-Task FUTURE-PROCESS-SLOT me)
				(more-work My-Task))
			      (begin
				(stop-preempting)
				(bkpt "TIMER: Existential crisis!"))))
			(next)))))
	    (set! preempting? #!true)
	    (set! delta interval)
	    (sti 0 interval))
	  (display "Already preempting when START-PREEMPTING called")))

    (DEFINE-ATOMIC (stop-preempting)
      (sti '() '())
      (set! preempting? #!false))
    
    (define make-future
      (let ((future-type (microcode-type 'FUTURE)))
 	(named-lambda (make-future orig-code user-procedure name)
	  (primitive-set-type
	   future-type
	   (vector #!false		; DETERMINED: No value yet
		   #!false		; LOCK:       Not locked
		   (make-empty-queue)
					; VALUE/QUEUE:No waiters
		   orig-code		; PROCESS:    How to resume
		   'CREATED		; STATUS:     Ready to go
		   user-procedure	; ORIG_CODE:  For debugging
		   (if (unbound? open-console-channel)
		       name
		       (vector
			(open-console-channel name)))
					; PROCESS-PRIVATE: Butterfly??
		   '()			; WAITING-ON: Not waiting
		   0			; METER: Ignored by simulator
		   '())))))		; USER-SLOT

    (define (more-work work)
      (future-set! work FUTURE-STATUS-SLOT 'RUNNABLE)
      (put-work work)
      (if (and delta (not preempting?))
	  (start-preempting delta)))

    (define spawn-process
      (let ((make-initial-process
	     (make-primitive-procedure 'make-initial-process)))
	(named-lambda (spawn-process thunk doc #!optional start)
	  (let ((object)
		(dynamic-state (current-dynamic-state)))
	    (set! object
		  (make-future
		   (make-initial-process
		    (lambda ()
		      (set-current-dynamic-state! dynamic-state)
		      (thunk)))
		   thunk doc))
	    ((if (unassigned? start) start-process start) object)
	    object))))

    (DEFINE-ATOMIC (end-of-computation-handler expression environment value)
      (let ((me (current-future)))
	(determine! me value #!false))
      (next))
    
    (define (determine! future value #!optional keep-slot?)
      ;; AWAKEN! is called with a queue (of processes waiting
      ;; for a future) and promotes them all to runnable status.
      (define (awaken! queue)
	(let loop ()
	  (if (empty-queue? queue)
	      'DONE
	      (let ((next-item (dequeue! queue)))
		(LOCKING-FUTURE next-item item-runnable?
		  (if (and
		       item-runnable?
		       (eq? (future-ref next-item FUTURE-STATUS-SLOT) 'WAITING)
		       (non-touching-memq future (future-ref next-item FUTURE-WAITING-ON-SLOT)))
		      (begin
			(future-set! next-item FUTURE-WAITING-ON-SLOT future)
			(more-work next-item))))
		(loop)))))
      (LOCKING-FUTURE future was-still-a-future?
	(if was-still-a-future?
	    (let ((known? (future-ref future FUTURE-DETERMINED-SLOT))
		  (waiters (future-ref future FUTURE-QUEUE-SLOT)))
	      (if (eq? known? #!true)
		  (error "Future cannot be determined twice." future))
	      (future-set! future FUTURE-VALUE-SLOT value)
	      (future-set! future FUTURE-STATUS-SLOT 'DETERMINED)
	      (if (unassigned? keep-slot?)
		  (if (eq? known? #!false)
		      (future-set! future FUTURE-DETERMINED-SLOT #!true))
		  (future-set! future FUTURE-DETERMINED-SLOT
			       (if keep-slot? 'KEEP-SLOT #!true)))
	      (if (not known?) (awaken! waiters)))
	    (error "Future cannot be determined twice." future)))
      value)
    
    (define (Futures-On?) (not (unassigned? Current-Future-Vector)))

    (define (Futures-Off)
      (pause-everything)
      (set! Current-Future-Vector)
      'FUTURES-TURNED-OFF)

    (define (Current-Future)
      (if (Futures-On?)
	  (vector-ref Current-Future-Vector (My-Interpreter-Number))
	  '()))

    (define (Set-Current-Future! Future)
      (vector-set! Current-Future-Vector (My-Interpreter-Number)
		   Future))

    (define (initialize-scheduler!
	     #!optional interval default-scheduler non-aborting?)
      (let ((set-fixed-objects-vector!
	     (make-primitive-procedure 'set-fixed-objects-vector!)))
	(pause-everything)		; Stop all processors & drain queue
	(let ((termination-handlers
	       (vector-ref (get-fixed-objects-vector)
			   (fixed-objects-vector-slot
			    'MICROCODE-TERMINATIONS-PROCEDURES))))
	  (if (= (vector-length termination-handlers) 0)
	      (begin
		(set! termination-handlers 
		      (vector-cons number-of-microcode-terminations '()))
		(vector-set! (get-fixed-objects-vector)
			     (fixed-objects-vector-slot
			      'MICROCODE-TERMINATIONS-PROCEDURES)
			     termination-handlers)
		(set-fixed-objects-vector! (get-fixed-objects-vector))))
	  (vector-set! termination-handlers
		       (microcode-termination 'END-OF-CONTINUATION)
		       end-of-computation-handler))
	(set! Start-Process
	      (if (unassigned? default-scheduler)
		  dfuture-scheduler
		  default-scheduler))
	(set! Current-Future-Vector
	      (vector-cons (N-Interpreters) 'NO-FUTURE-YET))
	(set! Idle-Future (make-future 'NO-PROCESS 'NO-PROCESS "Idle-Loop"))

	;; INITIALIZE-SCHEDULER (futures-on) continues on the next page
	
	;; INITIALIZE-SCHEDULER (futures-on), continued

	(if (not (unassigned? interval)) (set! delta interval))
	(let ((fobj (get-fixed-objects-vector)))
	  (vector-set! fobj (fixed-objects-vector-slot 'SCHEDULER)
		       await-future)
	  (set-fixed-objects-vector! fobj))
	(Set-Current-Future!
	 (make-future 'INITIAL-PROCESS 'INITIAL-PROCESS "The Initial Process"))
	(future-set! (current-future) FUTURE-STATUS-SLOT 'RUNNING)
	(global-interrupt
	 1
	 (lambda (IntCode IntEnb)
	   (set-interrupt-enables! IntEnb)
	   (next))
	 (lambda () #!true))
	(if (or (unbound? abort-to-top-level-driver)
		(and (not (unassigned? non-aborting?))
		     non-aborting?))
	    (or Delta 'NOT-PREEMPTIVE-SCHEDULING)
	    (abort-to-top-level-driver
	     (cond ((unbound? format) "^G to restart the futures")
		   ((not Delta) "^G: no preemptive scheduling")
		   ((negative? Delta)
		    (format () "^G: scheduling ~o.~o~o (real) secs."
			    (quotient (abs Delta) 100)
			    (remainder (quotient (abs Delta) 10) 10)
			    (remainder (remainder (abs Delta) 10) 10)))
		   (else
		    (format () "^G: scheduling ~o.~o~o (runtime) secs."
			    (quotient Delta 100)
			    (remainder (quotient Delta 10) 10)
			    (remainder (remainder Delta 10) 10))))))))
    
					; Scheduling support

    (define (next)
      (let ((get-work (make-primitive-procedure 'get-work)))
	(Set-Current-Future! 'WAITING-FOR-WORK)
	(run (get-work
	      (named-lambda (loop)
		(stop-preempting)
		(determine! Idle-Future 'DONE)
		(set! Idle-Future
		      (make-future 'NO-PROCESS 'NO-PROCESS "Idle Loop"))
		(run (get-work
		      (lambda ()
			(error "No Work Available")
			(loop)))))))))

    ;; RUN starts a process running

    (define (run future)
      ((LOCKING-FUTURE future Still-A-Future?
	 (if Still-A-Future?
	     (let ((new-process
		    (future-set! future
				 FUTURE-PROCESS-SLOT (My-Interpreter-Number)))
		   (old-status (future-set! future FUTURE-STATUS-SLOT 'RUNNING)))
	       (if (and (legitimate-process? new-process)
			(eq? old-status 'RUNNABLE))
		   (begin
		     (Set-Current-Future! future)
		     (if (and delta preempting?)
			 (sti 0 delta))	; Full time interval
		     (lambda () (new-process 'YOUR-TURN)))
		   (begin
		     (future-set! future FUTURE-STATUS-SLOT old-status)
		     (future-set! future FUTURE-PROCESS-SLOT new-process)
		     next)))
	     next))))
    
    ;; AWAIT-FUTURE suspends the current process and adds it to the
    ;; queue waiting for the specified future to get a value.  The
    ;; thunk, if specified, is executed immediately before going off for
    ;; more work to do (i.e. after all of the enqueuing work, etc. is done).

    (DEFINE-ATOMIC (await-future future #!optional thunk)
      (WITH-STATE me
	(let
	    ((perform-normal-return?
	      (LOCKING-FUTURE future waiting-for-a-future?
		(if (or (not waiting-for-a-future?) ; Already determined
			(future-ref future FUTURE-DETERMINED-SLOT))
		    #!TRUE		; Return normally
		    (let ((My-Task (Current-Future))
			  (status (future-ref future FUTURE-STATUS-SLOT)))
		      (if (or (eq? status 'DELAYED) (eq? status 'PAUSED))
			  (more-work future))
		      (LOCKING-FUTURE My-Task I-am-running?
			(if I-am-running?
			    (begin
			      (future-set! My-Task FUTURE-PROCESS-SLOT me)
			      (future-set! My-Task FUTURE-STATUS-SLOT 'WAITING)
			      (future-set! My-Task FUTURE-WAITING-ON-SLOT
					   (list future)) 
			      (enqueue! (future-ref future FUTURE-QUEUE-SLOT) My-Task))
			    (display "AWAIT-FUTURE: Existential crisis!")))
		      #!FALSE)))))	; Don't return normally
	  (if (not (unassigned? thunk)) (thunk)) ; Do the optional work
	  (if perform-normal-return?	; Done or find more work
	      (me 'DONE)
	      (next)))))
    

    ;; AWAIT-FUTURE-AFTER-ACTION suspends the current process after 
    ;; executing a thunk and adds it to the queue waiting for the
    ;; specified future to get a value.  Its purpose is to ensure
    ;; that the process is actually on the wait queue of the future
    ;; when the action takes place.  This prevents a race condition
    ;; which might cause problems if the action is intended to determine
    ;; the future (e.g., externally) and wake up the process.  In other words,
    ;; if the process has not been added to the wait queue when another
    ;; process or an external interrupt determines the future, the event 
    ;; would not wake up the process as intended.
    ;;
    ;; Since this is rather specialized code (it was added to support
    ;; the new console i/o system), many safeguards of the normal 
    ;; AWAIT-FUTURE are removed for speed.  Normally the future will
    ;; have been explicitly constructed by the user and is not being
    ;; determined by any other process.

    (define (await-future-after-action future action)
      (await-future future action))
    
    (define await-internal
      (let ((fall-through-tag (cons 'FALL-THROUGH '())))
	(named-lambda (await-internal My-Task futures)
	  (let ((Value-Known? #!false))
	    (define (announce-value value)
	      (if (not (set! Value-Known? #!true))
		  (determine! My-Task value)))
	    (define (spawn-processes disjuncts)
	      (if (null? disjuncts)
		  'DONE
		  (let ((this-future (car disjuncts)))
		    (if (LOCKING-FUTURE this-future really-a-future?
			  (if (and really-a-future?
				   (null?
				    (future-ref
				     this-future FUTURE-DETERMINED-SLOT)))
			      (begin
				(enqueue!
				 (future-ref this-future FUTURE-QUEUE-SLOT)
				 My-Task)
				#!true)
			      (begin
				(announce-value this-future)
				#!false)))
			(spawn-processes (cdr disjuncts))))))
	    (if (eq? fall-through-tag
		     (task-catch (lambda (me) ; Deliberately re-entrant
				   (future-set! My-Task FUTURE-PROCESS-SLOT me)
				   (future-set! My-Task FUTURE-STATUS-SLOT 'WAITING)
				   (future-set! My-Task FUTURE-WAITING-ON-SLOT futures)
				   (spawn-processes futures)
				   fall-through-tag)))
		My-Task
		(begin
		  (announce-value (future-ref My-Task FUTURE-WAITING-ON-SLOT))
		  (next)))))))

    (define (disjoin . futures)
      (await-internal (make-future 'DISJOIN 'DISJOIN "Disjoin") futures))

    (define (await-first-of futures)
      (await-internal (make-future 'DISJOIN 'DISJOIN "Disjoin") futures))
    
					; Special scheduler operations

    ;; RESCHEDULE allows me to give up my processor slice and
    ;; wait until the scheduler gets back to me.

    (define-atomic (reschedule)
      (let ((my-task (current-future)))
	(WITH-STATE me
	  (if (LOCKING-FUTURE my-task am-I-running?
		(if am-I-running?
		    (begin
		      (future-set! my-task FUTURE-PROCESS-SLOT me)
		      (more-work my-task)))
		am-I-running?)
	      (next)
	      'NOT-CURRENTLY-RUNNING-A-FUTURE))))
	      

    ;; WAIT-UNTIL-IDLE causes a process to just continue
    ;; going to sleep until there are no other active processes.

    (define (wait-until-idle) (touch idle-future))

    ;; DFUTURE-SCHEDULER is a future creation scheduler which
    ;; defers the child process and continues on with the parent.
    ;; Note that all creation schedulers are called as part of
    ;; the parent process, so this is the easy case.

    (DEFINE-ATOMIC (dfuture-scheduler future)
      (more-work future)
      'CHILD-QUEUED-FOR-EXECUTION)

    ;; FUTURE-SCHEDULER is a future creation scheduler which
    ;; defers the parent process and continues on with the child.
    ;; This is a little harder than DFUTURE, since it is called
    ;; running as the parent.

    (DEFINE-ATOMIC (future-scheduler future)
      (WITH-STATE parent-process
	(let ((My-Future (Current-Future)))
	  (LOCKING-FUTURE My-Future Still-Runnable?
	    (if Still-Runnable?
		(begin
		  (future-set! My-Future FUTURE-PROCESS-SLOT parent-process)
		  (more-work My-Future))))))
      (more-work future)
      (run future))

    ;; DELAY-SCHEDULER is a future creation scheduler which defers
    ;; execution of the newly created future until it is first
    ;; touched.

    (DEFINE-ATOMIC (delay-scheduler future)
      (future-set! future FUTURE-STATUS-SLOT 'DELAYED)
      'OK-I-DELAYED-IT)
    
    ;; Queue Abstraction
    ;;
    ;; -------------------------------
    ;; | Tail Pointer | Head Pointer |
    ;; -------------------------------
    ;;        |             |
    ;;        |             |
    ;;        V             V    
    ;;      -----  -----  -----
    ;;      | |=|=>| |=|=>| |/| add new items by clobbering '()
    ;;      -----  -----  -----
    ;; remove from start of list
    ;; (The list itself is made from WEAK cons cells)
    ;;
    ;; The queue is empty when Tail=Head=#!NULL
    ;; (thus it has one item when Tail=Head but they are not #!NULL)
    ;;
    ;; These operations assume that the caller has arranged for any
    ;; desired atomicity.

    (define (weak-cons a b)
      (system-pair-cons weak-cons-type a b))
    (define weak-car system-pair-car)
    (define weak-cdr system-pair-cdr)
    (define weak-set-car! system-pair-set-car!)
    (define weak-set-cdr! system-pair-set-cdr!)
    
    (define (make-empty-queue) (cons '() ()))
    (define queue-head-ptr car)
    (define queue-tail-ptr cdr)
    (define set-queue-head-ptr! set-car!)
    (define set-queue-tail-ptr! set-cdr!)

    (define (empty-queue? queue) (null? (queue-head-ptr queue)))
    
    (define (enqueue! queue object)
      (if (null? (queue-head-ptr queue))
	  (begin
	    (set-queue-head-ptr! queue (weak-cons object '()))
	    (set-queue-tail-ptr! queue (queue-head-ptr queue)))
          (begin
            (weak-set-cdr! (queue-head-ptr queue) (weak-cons object '()))
            (set-queue-head-ptr! queue (weak-cdr (queue-head-ptr queue))))))
    
    (define (dequeue! queue)
      (let ((current-tail (queue-tail-ptr queue)))
	(if (null? current-tail)
	    (error "Queue empty" queue)
	    (let ((result (weak-car current-tail)))
	      (if (null? (weak-cdr current-tail))
		  (begin (set-queue-head-ptr! queue '())
			 (set-queue-tail-ptr! queue '()))
		  (set-queue-tail-ptr! queue (weak-cdr current-tail)))
	      result))))
    
    ;; SAVING-STATE wraps up the current state of the system into the
    ;; current future and returns it to the work queue.  It then executes
    ;; the thunk.  If the current future is invoked the call to
    ;; SAVING-STATE is exitted; when the thunk returns, the processor will
    ;; wait for new work to perform.

    (define (saving-state thunk)
      (WITH-STATE my-state
	(let ((my-future (current-future)))
	  (LOCKING-FUTURE my-future am-I-running?
	    (if am-I-running?
		(begin
		  (future-set! my-future FUTURE-PROCESS-SLOT my-state)
		  (future-set! my-future FUTURE-STATUS-SLOT 'RUNNABLE)
		  (put-work my-future)))))
	(set-current-future! 'STATE-SAVED)
	(set! my-state)
	(within-control-point the-error-continuation
			      (lambda ()
				(thunk)
				(next))))
      'COMPLETED)
    
    ;; PAUSE-EVERYTHING is used to make every processor but the caller
    ;; save its state and go quiescent.  The value returned by
    ;; Pause-Everything is a procedure which will put the work queue 
    ;; back to its initial state (modulo order of futures on the queue).

    (DEFINE-ATOMIC (pause-everything)

      ;; RELEASE-STATE! takes a list of futures and puts them
      ;; on the work queue.
      (define (release-state! list)
	(if (null? list)
	    'RESTARTED
	    (let ((work-unit (car list)))
	      (LOCKING-FUTURE work-unit work-to-do?
		(if (and work-to-do?
			 (legitimate-process?
			  (future-ref work-unit FUTURE-PROCESS-SLOT))
			 (eq? (future-ref work-unit FUTURE-STATUS-SLOT)
			      'PAUSED))
		    (more-work work-unit)))
	      (release-state! (cdr list)))))
      
      ;; WEAK-LIST->LIST! takes a weak list of futures, as
      ;; returned by DRAIN-WORK-QUEUE! and converts it to a list of
      ;; the objects referenced.  The GC code needs the weak form,
      ;; hence the extra work here.  In the process, each future is
      ;; made to be PAUSED so it will automatically resume if touched

      (define (weak-list->list! weak-list)
	(let loop ((current weak-list)
		   (result '()))
	  (if (null? current)
	      result
	      (let ((work-unit (weak-car current)))
		(LOCKING-FUTURE work-unit work-to-do?
		  (if work-to-do?
		      (begin
			(future-set! work-unit FUTURE-STATUS-SLOT 'PAUSED)
			(loop (weak-cdr current)
			      (cons work-unit result)))
		      (loop (weak-cdr current) result)))))))
      
      (define ((returned-object the-queue) #!optional message)
	(if (unassigned? message) (set! message 'Restart-tasks))
	(cond ((eq? message 'Any-Tasks?)
	       (and (not (eq? the-queue #!true))
		    (not (null? the-queue))))
	      ((eq? message 'Restart-tasks)
	       (if (not (eq? the-queue #!true))
		   (release-state! the-queue)
		   (error "Attempt to re-use a pause object!"))
	       (set! the-queue #!true))
	      ((eq? message 'The-Tasks)
	       (if (eq? the-queue #!true)
		   '()
		   the-queue))
	      (else (error "Pause object: strange message" message))))

      (if (not (Futures-On?))
	  (returned-object '())
	  (let ((save-synch (make-synchronizer))
		(drain-synch (make-synchronizer))
		(proceed-synch (make-synchronizer)))
	    (stop-preempting)
	    (global-interrupt
	     1
	     (lambda (int-code int-mask)
	       (await-synchrony save-synch)
	       (saving-state
		(lambda ()
		  (set-interrupt-enables! int-mask)
		  (await-synchrony drain-synch)
		  (await-synchrony proceed-synch))))
	     (lambda () #!TRUE))
	    (await-synchrony save-synch)
	    (await-synchrony drain-synch)
	    (let ((me (current-future))
		  (the-queue (weak-list->list! (drain-work-queue!))))
	      (set! Current-Future-Vector (vector-cons (N-Interpreters) 'PAUSED))
	      (Set-Current-Future! me)
	      (await-synchrony proceed-synch)
	      (returned-object the-queue)))))
    
    ;; WITH-TASKS-SUSPENDED executes the thunk with all other processes
    ;; stopped. It returns the value of the thunk.

    (define (with-tasks-suspended thunk)
      (if (not (Futures-On?))
	  (thunk)
	  (fluid-let ((the-paused-tasks (pause-everything)))
	    (dynamic-wind
	     (lambda ()
	       (if (the-paused-tasks 'any-tasks?)
		   (begin
		     (newline)
		     (display "[Suspending tasks]"))))
	     thunk
	     (lambda ()
	       (cond ((not (the-paused-tasks 'any-tasks?)) '())
		     (discard-the-paused-tasks?
		      (newline) (display "[Discarding tasks]") (newline))
		     (else
		      (newline) (display "[Resuming tasks]") (newline)
		      (the-paused-tasks 'Restart-tasks))))))))

    ;; Dealing with recently suspended tasks

    (define (discard-recently-suspended-tasks!)
      (set! discard-the-paused-tasks? #!true))

    (define (prevent-discarding-processes!) 
      (set! discard-the-paused-tasks? #!false))
    
    ;; Execution within a selected task

    (define (within-process future thunk)
      (define (loop noisy?)
	((LOCKING-FUTURE future true-future?
	   (if true-future?
	       (let ((status (future-ref future FUTURE-STATUS-SLOT))
		     (process (future-ref future FUTURE-PROCESS-SLOT)))
		 (cond ((non-touching-eq? future (current-future))
			thunk)
		       ((eq? status 'RUNNING)
			(lambda ()
			  (if noisy?
			      (bkpt "WITHIN-PROCESS: process is running"))
			  (loop #!false)))
		       (else
			(future-set! future FUTURE-PROCESS-SLOT
				     (lambda (arg) (thunk) (process 'go)))
			(more-work future)
			(lambda () (run future)))))
	       (begin
		 (error "WITHIN-PROCESS: Not a process" future)
		 (lambda () (next)))))))
      (loop #!true))
    ))					; end of Make-Environment for Scheduler


; Export definitions to the world outside the scheduler

(define initialize-scheduler! (access initialize-scheduler! scheduler))
(define determine! (access determine! scheduler))
(define future-scheduler (access future-scheduler scheduler))
(define dfuture-scheduler (access dfuture-scheduler scheduler))
(define delay-scheduler (access delay-scheduler scheduler))
(define next (access reschedule scheduler))
(define wait-until-idle (access wait-until-idle scheduler))
(define pause-everything (access pause-everything scheduler))
(define with-tasks-suspended (access with-tasks-suspended scheduler))
(define discard-recently-suspended-tasks!
  (access discard-recently-suspended-tasks! scheduler))
(define prevent-discarding-processes!
  (access prevent-discarding-processes! scheduler))
(define Current-Future (access Current-Future scheduler))
(define Futures-On? (access Futures-On? scheduler))
(define Futures-Off (access Futures-Off scheduler))
(define Saving-State (access Saving-State scheduler))
(define within-process (access within-process scheduler))
(define Disjoin (access disjoin scheduler))
(define Await-First-Of (access await-first-of scheduler))

(define (futures-on #!optional slice)
  (if (unassigned? slice) (set! slice '()))
  (initialize-scheduler! slice dfuture-scheduler))

(define (non-touching-memq element list)
  (cond ((null? list) #!false)
	((non-touching-eq? element (car list)) list)
	(else (non-touching-memq element (cdr list)))))

(define (non-touching-assq element list)
  (cond ((null? list) #!false)
	((non-touching-eq? element (caar list)) (car list))
	(else (non-touching-assq element (cdr list)))))

(add-syntax! 'future
   (macro (expression #!optional doc user-scheduler)
     `((ACCESS SPAWN-PROCESS SCHEDULER)
       (LAMBDA () ,expression)		   ; Work to do
       ,(if (unassigned? doc)		   ; Documentation
	    (with-output-to-string
	     (lambda () (display expression)))
	    doc)
       ,@(if (unassigned? user-scheduler)  ; Start-up procedure
	     '()
	     `(,user-scheduler)))))

(futures-on)