Skip to content
Snippets Groups Projects
Commit c1921d5d authored by Francois-Rene Rideau's avatar Francois-Rene Rideau
Browse files

Use lparallel for thread pooling instead of chanl.

parent c36af4bd
Branches
No related tags found
No related merge requests found
README 0 → 100644
quux-hunchentoot
================
TODO
----
Have an acceptor that listens on existing connections
so that scheduled work is done per-request as opposed to per-connection.
......@@ -2,7 +2,7 @@
(:use :common-lisp
:hunchentoot
:alexandria
:chanl ;; for exchanging messages
:lparallel ;; for exchanging messages
:optima ;; for matching inter-thread messages
:classy) ;; for its >simple-fifo-queue<
(:import-from
......@@ -11,6 +11,10 @@
#:*default-max-thread-count*
#:*hunchentoot-stream*
#:acceptor-process
#:decrement-taskmaster-accept-count
#:increment-taskmaster-accept-count
#:decrement-taskmaster-thread-count
#:increment-taskmaster-thread-count
#:handler-case*
#:send-service-unavailable-reply)
(:export
......
(defsystem quux-hunchentoot
:depends-on (:hunchentoot ;; we extend it -- we depend on hunchentoot 1.2.15 + patch
;; see https://github.com/edicl/hunchentoot/pull/52
:depends-on ((:version :hunchentoot "1.2.17")
:alexandria ;; for various utilities
#-asdf3 :asdf-driver ;; for various utilities
:bordeaux-threads ;;
#-asdf3 :uiop ;; for various utilities
:bordeaux-threads ;; for threads
:lil ;; for FIFO queues
:chanl ;; for communication channels
:lparallel ;; for communication channels
:optima) ;; for parsing messages sent over channels
:components
((:file "pkgdcl")
......
(in-package :quux-hunchentoot)
(defun make-channel ()
"Create a communication channel between threads in our pool"
(make-instance 'unbounded-channel))
;; This taskmaster takes threads out of a worker thread pool.
;; Workers have to register themselves whenever they are ready
;; to process one request, and register again when they are ready again.
......@@ -13,36 +9,39 @@
;;
(defclass thread-pooling-taskmaster (multi-threaded-taskmaster)
((master-lock
;; controls status, acceptor-process, and
;; controls dispatcher-process, acceptor-process, and
;; all slots that are valid while there is or may be no dispatcher-process to process messages.
:initform (bt:make-lock "taskmaster-master")
:reader taskmaster-master-lock
:documentation
"Thread-unsafe operations without a clear owner use this lock")
(status ;; must hold the master lock to change while stopped; owned by the dispatcher while running or stopping.
:accessor taskmaster-status
:initform :stopped
:documentation
"The status of the taskmaster: :STOPPED, :RUNNING, :STOPPING.")
(dispatcher-process ;; must hold the master lock to change
:accessor dispatcher-process
:accessor taskmaster-dispatcher-process
:initform nil
:documentation
"A process that dispatches connections to worker processes for handling,
or withholds them when resources are missing.")
(dispatcher-channel ;; receiving owned by the dispatcher. Anyone can send.
:initarg :dispatcher-channel
:initform (make-channel)
:accessor dispatcher-channel)
(available-workers ;; owned by the dispatcher.
:documentation "A list of channels, each to speak to an available worker"
:initarg :available-workers
:initform (empty-fifo-queue)
:accessor taskmaster-available-workers)
(busy-workers ;; owned by the dispatcher.
:documentation "A set of busy workers"
:initarg :busy-workers
:initform (make-hash-table :test 'equal)
:accessor taskmaster-busy-workers)
:initform nil
:accessor taskmaster-dispatcher-channel)
(context ;; must only change before the kernel is started
:accessor taskmaster-context
:initarg :context
:initform 'funcall
:documentation
"A context function, taking a thunk as argument, and calling it within proper context,
for workers in the thread pool.")
(bindings ;; must only change before the kernel is started
:accessor taskmaster-bindings
:initform nil
:documentation
"bindings (as an alist) to wrap around workers in the thread pool.")
(thread-pool ;; must hold the master lock to change
:initform nil
:accessor taskmaster-thread-pool
:documentation
"A kernel to which to bind lparallel:*kernel* to handle the thread pool.")
(pending-connections ;; owned by the dispatcher.
:documentation "A list of pending connection socket"
:initarg :pending-connections
......@@ -83,12 +82,7 @@
:documentation
"The number of connection currently accepted by the taskmaster. These
connections are not ensured to be processed, they may be waiting for an
empty processing slot or rejected because the load is too heavy.")
(worker-thread-name-format ;; must only be modified while stopped.
:type (or string null)
:initarg :worker-thread-name-format
:initform "hunchentoot-worker-~A"
:accessor taskmaster-worker-thread-name-format))
empty processing slot or rejected because the load is too heavy."))
(:default-initargs
:max-thread-count *default-max-thread-count*
:max-accept-count *default-max-accept-count*)
......@@ -121,11 +115,25 @@ MAX-ACCEPT-COUNT. This will cause a server that's almost out of
resources to wait a bit; if the server is completely out of resources,
then the reply will be HTTP 503."))
(defun call-with-thread-pool (taskmaster thunk)
(let ((*kernel* (taskmaster-thread-pool taskmaster)))
(unless *kernel*
(error "Thread pool not active"))
(funcall thunk)))
(defmacro with-thread-pool ((taskmaster) &body body)
`(call-with-thread-pool ,taskmaster #'(lambda () ,@body)))
(defmacro with-taskmaster-accessors (slots taskmaster &body body)
`(with-accessors
,(loop for slot in slots
collect `(,slot ,(format-symbol :quux-hunchentoot "~A-~A" 'taskmaster slot)))
,taskmaster ,@body))
(defmethod initialize-instance :after ((taskmaster thread-pooling-taskmaster) &rest init-args)
"Ensure the if MAX-ACCEPT-COUNT is supplied, that it is greater than MAX-THREAD-COUNT."
(declare (ignore init-args))
(with-accessors ((max-accept-count taskmaster-max-accept-count)
(max-thread-count taskmaster-max-thread-count)) taskmaster
(with-taskmaster-accessors (max-accept-count max-thread-count) taskmaster
(when max-accept-count
(unless max-thread-count
(parameter-error "MAX-THREAD-COUNT must be non-NIL if MAX-ACCEPT-COUNT is non-NIL (was ~D)"
......@@ -134,107 +142,84 @@ then the reply will be HTTP 503."))
(parameter-error "MAX-ACCEPT-COUNT must be greater than MAX-THREAD-COUNT, but ~D <= ~D"
max-accept-count max-thread-count)))))
(defmethod hunchentoot::decrement-taskmaster-accept-count ((taskmaster thread-pooling-taskmaster))
;; Compatibility function so we can reuse hunchentoot:send-service-unavailable-reply
(values))
(defgeneric ensure-dispatcher-process (taskmaster))
(defmethod ensure-dispatcher-process ((taskmaster thread-pooling-taskmaster))
(with-taskmaster-accessors (dispatcher-process) taskmaster
(assert (eq dispatcher-process (bt:current-thread)))))
(defmethod decrement-taskmaster-accept-count ((taskmaster thread-pooling-taskmaster))
(ensure-dispatcher-process taskmaster)
(decf (taskmaster-accept-count taskmaster)))
(defmethod increment-taskmaster-accept-count ((taskmaster thread-pooling-taskmaster))
(ensure-dispatcher-process taskmaster)
(incf (taskmaster-accept-count taskmaster)))
(defmethod decrement-taskmaster-thread-count ((taskmaster thread-pooling-taskmaster))
(ensure-dispatcher-process taskmaster)
(decf (taskmaster-thread-count taskmaster)))
(defmethod increment-taskmaster-thread-count ((taskmaster thread-pooling-taskmaster))
(ensure-dispatcher-process taskmaster)
(incf (taskmaster-thread-count taskmaster)))
(defmethod shutdown ((taskmaster thread-pooling-taskmaster))
;; just wait until all workers are done, send them all a shutdown message, then die.
(bt:with-lock-held ((taskmaster-master-lock taskmaster))
(ecase (taskmaster-status taskmaster)
((:stopped :stopping)) ;; no active dispatcher to receive a message
((:running) (dispatcher-send taskmaster `(:shutdown) :blockp nil)))))
(with-taskmaster-accessors (master-lock dispatcher-process) taskmaster
(bt:with-lock-held (master-lock)
(when dispatcher-process
(dispatcher-send taskmaster '(:shutdown))))))
;; NB: by using the send and recv gf's, we provide a specialization point.
(defgeneric dispatcher-send (taskmaster message &key &allow-other-keys)
(:method ((taskmaster thread-pooling-taskmaster) message &rest keys &key &allow-other-keys)
(apply 'send (dispatcher-channel taskmaster) message keys)))
(:method ((taskmaster thread-pooling-taskmaster) message &key &allow-other-keys)
(lparallel.queue:push-queue
message
(lparallel.kernel::channel-queue (taskmaster-dispatcher-channel taskmaster)))))
(defgeneric dispatcher-recv (taskmaster &key &allow-other-keys)
(:method ((taskmaster thread-pooling-taskmaster) &rest keys &key &allow-other-keys)
(apply 'recv (dispatcher-channel taskmaster) keys)))
(:method ((taskmaster thread-pooling-taskmaster) &key &allow-other-keys)
(receive-result (taskmaster-dispatcher-channel taskmaster))))
(defmethod execute-acceptor ((taskmaster thread-pooling-taskmaster))
(bt:with-lock-held ((taskmaster-master-lock taskmaster))
(when (eq (taskmaster-status taskmaster) :stopped)
(setf (taskmaster-status taskmaster) :running)
(with-taskmaster-accessors
(master-lock
max-accept-count max-thread-count
acceptor
dispatcher-process dispatcher-channel
context bindings thread-pool)
taskmaster
(bt:with-lock-held (master-lock)
(let ((address (or (acceptor-address acceptor) "*"))
(port (acceptor-port acceptor)))
(assert (null thread-pool))
(assert (null dispatcher-process))
(assert (null dispatcher-channel))
(setf thread-pool
(make-kernel (or max-thread-count most-positive-fixnum)
:name (format nil "quux-hunchentoot-thread-pool-~A:~A" address port)
:context context :bindings bindings))
(with-thread-pool (taskmaster)
(setf dispatcher-channel (make-channel))
(setf (acceptor-process taskmaster)
(start-thread
taskmaster
(lambda () (accept-connections (taskmaster-acceptor taskmaster)))
:name (format nil "hunchentoot-listener-~A:~A"
(or (acceptor-address (taskmaster-acceptor taskmaster)) "*")
(acceptor-port (taskmaster-acceptor taskmaster)))))
(setf (dispatcher-process taskmaster)
:name (format nil "quux-hunchentoot-listener-~A:~A" address port)))
(setf dispatcher-process
(start-thread
taskmaster
(lambda () (run-dispatcher-thread taskmaster))
:name (format nil "hunchentoot-dispatcher-~A:~A"
(or (acceptor-address (taskmaster-acceptor taskmaster)) "*")
(acceptor-port (taskmaster-acceptor taskmaster))))))))
:name (format nil "quux-hunchentoot-dispatcher-~A:~A" address port))))))))
(defmethod handle-incoming-connection ((taskmaster thread-pooling-taskmaster) connection)
(dispatcher-send taskmaster `(:process-connection ,connection) :blockp nil))
(defun mark-worker-ready (taskmaster worker-id chan)
;; POST: the worker has been removed from the busy-workers and pushed onto the available-workers
(when (gethash worker-id (taskmaster-busy-workers taskmaster))
(remhash worker-id (taskmaster-busy-workers taskmaster))
(decf (taskmaster-accept-count taskmaster)))
(enqueue (taskmaster-available-workers taskmaster) `(:worker ,worker-id ,chan))
(values))
(defun mark-worker-busy (taskmaster worker-id connection &optional chan)
;; PRE: the worker has been removed from the available-workers
;; POST: the worker is added to the busy-workers
(setf (gethash worker-id (taskmaster-busy-workers taskmaster)) (list connection chan))
(values))
(defun get-worker-busy-on-connection (taskmaster worker-id channel connection)
;; POST: the worker is added to the busy-workers
(send channel `(:process-connection ,connection) :blockp nil)
(mark-worker-busy taskmaster worker-id connection channel))
(defmethod too-many-taskmaster-requests ((taskmaster thread-pooling-taskmaster) connection)
(declare (ignore connection))
(acceptor-log-message (taskmaster-acceptor taskmaster)
:warning "Can't handle a new request, too many request threads already"))
(defgeneric run-worker-thread (taskmaster worker-id channel))
(defmethod run-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id channel)
;; NB: define :before methods to initialize any application-specific worker thread state
(loop for request = (recv channel :blockp t)
do (ematch request
((list :process-connection connection)
(process-connection (taskmaster-acceptor taskmaster) connection))
((list :shutdown)
(return)))
(dispatcher-send taskmaster `(:worker-ready ,worker-id ,channel) :blockp nil)))
(defgeneric next-worker-id (taskmaster))
(defmethod next-worker-id ((taskmaster thread-pooling-taskmaster))
(incf (taskmaster-thread-count taskmaster)))
(defgeneric create-worker-thread (taskmaster))
(defmethod create-worker-thread ((taskmaster thread-pooling-taskmaster))
"Create a worker thread for handling requests"
;; we are handling all conditions here as we want to make sure that
;; the acceptor process never crashes while trying to create a
;; worker thread; one such problem exists in
;; GET-PEER-ADDRESS-AND-PORT which can signal socket conditions on
;; some platforms in certain situations.
(let ((worker-id (next-worker-id taskmaster))
(channel (make-channel)))
(handler-case*
(start-thread
taskmaster
(lambda () (run-worker-thread taskmaster worker-id channel))
:name (format nil (taskmaster-worker-thread-name-format taskmaster) worker-id))
(error (condition)
(let ((*acceptor* (taskmaster-acceptor taskmaster)))
(log-message* *lisp-errors-log-level*
"Error while creating worker thread: ~A" condition))))
(values worker-id channel)))
(defun work-on-connection (taskmaster connection)
`(:worker-done ,(process-connection (taskmaster-acceptor taskmaster) connection)))
(defgeneric run-dispatcher-thread (taskmaster))
......@@ -247,63 +232,43 @@ then the reply will be HTTP 503."))
;; - Otherwise if we're between MAX-THREAD-COUNT and MAX-ACCEPT-COUNT,
;; wait until the connection count drops, then handle the request
;; - Otherwise, increment THREAD-COUNT and start a taskmaster
(with-accessors ((master-lock taskmaster-master-lock)
(dispatcher-process dispatcher-process)
(taskmaster-status taskmaster-status)
(pending-connections taskmaster-pending-connections)
(available-workers taskmaster-available-workers)
(busy-workers taskmaster-busy-workers)
(accept-count taskmaster-accept-count)
(thread-count taskmaster-thread-count)
(max-accept-count taskmaster-max-accept-count)
(max-thread-count taskmaster-max-thread-count))
taskmaster
(bt:with-lock-held (master-lock)
(assert (eq taskmaster-status :running))
(assert (eq dispatcher-process (bt:current-thread))))
(with-taskmaster-accessors
(master-lock thread-pool pending-connections
dispatcher-channel dispatcher-process
accept-count max-accept-count
thread-count max-thread-count) taskmaster
(ensure-dispatcher-process taskmaster)
(with-thread-pool (taskmaster)
(loop
(ematch (dispatcher-recv taskmaster :blockp t)
((list :worker-ready worker-id chan)
(mark-worker-ready taskmaster worker-id chan))
;;; Process one message
(ematch (dispatcher-recv taskmaster)
((list :worker-done _)
(decrement-taskmaster-accept-count taskmaster)
(decrement-taskmaster-thread-count taskmaster))
((list :process-connection connection)
(enqueue pending-connections connection)
(incf accept-count))
(increment-taskmaster-accept-count taskmaster))
((list :shutdown)
;; NB: hunchentoot is supposed to stop the acceptor before the taskmaster
(setf taskmaster-status :stopping)))
(ecase taskmaster-status
(:stopping
(dolist (worker (dequeue-all available-workers))
(ematch worker
((list :worker worker-id channel)
(declare (ignorable worker-id))
(send channel '(:shutdown) :blockp nil))))
(dolist (connection (dequeue-all pending-connections))
(too-many-taskmaster-requests taskmaster connection)
(send-service-unavailable-reply taskmaster connection))
(when (empty-p busy-workers)
(bt:with-lock-held ((taskmaster-master-lock taskmaster))
(setf taskmaster-status :stopped)
(setf (dispatcher-process taskmaster) nil))
(return)))
(:running
(end-kernel :wait t)
(setf thread-pool nil
dispatcher-channel nil
dispatcher-process nil
(acceptor-process taskmaster) nil)))
;;; Do whatever work we can, while we can
(loop
(cond
;; nothing to do? wait for more work!
((empty-p pending-connections)
(return))
;; worker available? give him the job!
((not (empty-p available-workers))
(ematch (dequeue available-workers)
((list :worker worker-id channel)
(get-worker-busy-on-connection
taskmaster worker-id channel (dequeue pending-connections)))))
;; positions available for more workers? hire a new one for the job!
;; thread available? put it on the job!
((or (null max-thread-count) (< thread-count max-thread-count))
(multiple-value-bind (worker-id channel)
(create-worker-thread taskmaster)
(get-worker-busy-on-connection
taskmaster worker-id channel (dequeue pending-connections))))
(increment-taskmaster-thread-count taskmaster)
(submit-task dispatcher-channel
#'work-on-connection taskmaster (dequeue pending-connections)))
;; Already trying to handle too many connections? Deny request with 503.
((etypecase max-accept-count
(integer
......@@ -314,10 +279,11 @@ then the reply will be HTTP 503."))
nil))
(let ((connection (dequeue pending-connections)))
(too-many-taskmaster-requests taskmaster connection)
;; NB: the following decrements the accept-count
(send-service-unavailable-reply taskmaster connection)))
;; More connections than we are ready to process, but fewer than we are ready to accept?
;; Wait for some worker to become ready.
((and max-accept-count (>= thread-count max-thread-count))
(return))
(t
(error "WTF?")))))))))
(error "WTF?"))))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment