1 module digger_web; 2 3 import std.algorithm; 4 import std.array; 5 import std.conv; 6 import std.datetime; 7 import std.exception; 8 import std.functional; 9 import std.path; 10 import std.process; 11 import std..string; 12 13 static if(!is(typeof({import ae.net.asockets;}))) static assert(false, "ae library not found, did you clone with --recursive?"); else: 14 15 import ae.net.asockets; 16 import ae.net.http.client; 17 import ae.net.http.responseex; 18 import ae.net.http.server; 19 import ae.net.shutdown; 20 import ae.sys.cmd; 21 import ae.sys.timing; 22 import ae.utils.aa; 23 import ae.utils.funopt; 24 import ae.utils.main; 25 import ae.utils.meta : isDebug; 26 27 import common; 28 29 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 30 version(Windows) static import ae.sys.windows; 31 32 // http://d.puremagic.com/issues/show_bug.cgi?id=12481 33 alias pipe = std.process.pipe; 34 35 // *************************************************************************** 36 37 class WebFrontend 38 { 39 HttpServer httpd; 40 ushort port; 41 42 this(string host, ushort port) 43 { 44 httpd = new HttpServer(); 45 httpd.handleRequest = &onRequest; 46 this.port = httpd.listen(port, host); 47 48 addShutdownHandler(&httpd.close); 49 } 50 51 void onRequest(HttpRequest request, HttpServerConnection conn) 52 { 53 HttpResponseEx resp = new HttpResponseEx(); 54 resp.disableCache(); 55 56 try 57 { 58 string resource = request.resource; 59 auto queryTuple = resource.findSplit("?"); 60 resource = queryTuple[0]; 61 auto params = decodeUrlParameters(queryTuple[2]); 62 auto segments = resource.split("/")[1..$]; 63 64 switch (segments[0]) 65 { 66 case "initialize": 67 case "begin": 68 case "merge": 69 case "unmerge": 70 case "merge-fork": 71 case "unmerge-fork": 72 case "build": 73 case "install-preview": 74 case "install": 75 { 76 auto paramsArray = params 77 .pairs 78 .filter!(pair => !pair.value.empty) 79 .map!(pair => "--" ~ pair.key ~ "=" ~ pair.value) 80 .array; 81 startTask(segments ~ paramsArray); 82 return conn.sendResponse(resp.serveJson("OK")); 83 } 84 case "pull-state.json": 85 // Proxy request to GHDaemon, a daemon which caches GitHub 86 // pull request test results. Required to avoid GitHub API 87 // throttling without exposing a secret authentication token. 88 // https://github.com/CyberShadow/GHDaemon 89 enforce(resource.startsWith("/"), "Invalid resource"); 90 return httpRequest( 91 new HttpRequest("http://ghdaemon.k3.1azy.net" ~ resource), 92 (HttpResponse response, string disconnectReason) 93 { 94 if (conn.conn.state == ConnectionState.connected) 95 conn.sendResponse(response); 96 } 97 ); 98 case "refs.json": 99 { 100 struct Refs { string[] branches, tags; } 101 auto refs = Refs( 102 diggerQuery("branches"), 103 diggerQuery("tags"), 104 ); 105 return conn.sendResponse(resp.serveJson(refs)); 106 } 107 case "status.json": 108 { 109 enforce(currentTask, "No task was started"); 110 111 struct Status 112 { 113 immutable(Task.OutputLine)[] lines; 114 string state; 115 } 116 Status status; 117 status.lines = currentTask.flushLines(); 118 status.state = text(currentTask.getState()).split(".")[$-1]; 119 return conn.sendResponse(resp.serveJson(status)); 120 } 121 case "ping": 122 lastPing = Clock.currTime; 123 return conn.sendResponse(resp.serveJson("OK")); 124 case "exit": 125 log("Exit requested."); 126 shutdown(); 127 return conn.sendResponse(resp.serveJson("OK")); 128 default: 129 return conn.sendResponse(resp.serveFile(resource[1..$], "web/")); 130 } 131 } 132 catch (Exception e) 133 { 134 resp.serveText(e.msg); 135 resp.setStatus(HttpStatusCode.InternalServerError); 136 conn.sendResponse(resp); 137 } 138 } 139 } 140 141 // *************************************************************************** 142 143 SysTime lastPing; 144 145 enum watchdogTimeout = 5.seconds; 146 147 void watchdog() 148 { 149 if (exiting) 150 return; 151 if (lastPing != SysTime.init && Clock.currTime - lastPing > watchdogTimeout) 152 { 153 log("No ping request in %s, exiting".format(watchdogTimeout)); 154 return shutdown(); 155 } 156 setTimeout(toDelegate(&watchdog), 100.msecs); 157 } 158 159 bool exiting; 160 161 void startWatchdog() 162 { 163 addShutdownHandler({ exiting = true; }); 164 debug(ASOCKETS) {} else 165 watchdog(); 166 } 167 168 // *************************************************************************** 169 170 class Task 171 { 172 enum State { none, running, error, complete } 173 174 struct OutputLine 175 { 176 string text; 177 bool error; 178 } 179 180 this(string[] args...) 181 { 182 import core.thread; 183 184 void pipeLines(Pipe pipe, bool error) 185 { 186 // Copy the File to heap 187 auto f = [pipe.readEnd].ptr; 188 189 void run() 190 { 191 while (!f.eof) 192 { 193 auto s = f.readln(); 194 if (s.length == 0) 195 { 196 Thread.sleep(1.msecs); 197 continue; 198 } 199 writeToConsole("%s: %s".format("OE"[error], s)); 200 synchronized(this) 201 lines ~= OutputLine(s, error); 202 } 203 f.close(); 204 } 205 206 auto t = new Thread(&run); 207 t.isDaemon = true; 208 t.start(); 209 } 210 211 auto outPipe = pipe(); 212 auto errPipe = pipe(); 213 214 pipeLines(outPipe, false); 215 pipeLines(errPipe, true ); 216 217 import std.stdio : stdin; 218 pid = spawnProcess( 219 [absolutePath("digger"), "do"] ~ args, 220 stdin, 221 outPipe.writeEnd, 222 errPipe.writeEnd, 223 null, 224 isDebug ? Config.none : Config.suppressConsole, 225 ); 226 } 227 228 State getState() 229 { 230 auto result = pid.tryWait(); 231 if (result.terminated) 232 { 233 if (result.status == 0) 234 return State.complete; 235 else 236 return State.error; 237 } 238 else 239 return State.running; 240 } 241 242 immutable(OutputLine)[] flushLines() 243 { 244 synchronized(this) 245 { 246 auto result = lines; 247 lines = null; 248 return result; 249 } 250 } 251 252 private: 253 Pid pid; 254 immutable(OutputLine)[] lines; 255 } 256 257 Task currentTask; 258 259 bool taskRunning() 260 { 261 return currentTask && currentTask.getState() == Task.State.running; 262 } 263 264 void startTask(string[] args...) 265 { 266 enforce(!taskRunning(), "A task is already running"); 267 currentTask = new Task(args); 268 } 269 270 shared static this() 271 { 272 addShutdownHandler({ 273 if (taskRunning()) 274 { 275 log("Waiting for current task to finish..."); 276 currentTask.pid.wait(); 277 log("Task finished, exiting."); 278 } 279 }); 280 } 281 282 // *************************************************************************** 283 284 string[] diggerQuery(string[] args...) 285 { 286 return query([absolutePath("digger"), "do"] ~ args) 287 .splitLines() 288 // filter out log lines 289 .filter!(line => !line.startsWith("digger: ")) 290 .array(); 291 } 292 293 // *************************************************************************** 294 295 /// Try to figure out if this is a desktop machine 296 /// which can run a graphical web browser, or a 297 /// headless machine which can't. 298 /// Either open the URL directly, or just print it 299 /// and invite the user to do so themselves. 300 void showURL(string host, ushort port) 301 { 302 auto url = "http://%s:%s/".format(host, port); 303 304 version (Windows) 305 enum desktop = true; 306 else 307 version (OSX) 308 enum desktop = true; 309 else 310 bool desktop = environment.get("DISPLAY") != ""; 311 312 if (desktop) 313 { 314 import std.process; 315 log("Opening URL: " ~ url); 316 browse(url); 317 } 318 else 319 { 320 // TODO: replace "localhost" with the server's hostname. 321 log("To continue, please browse to: " ~ url); 322 } 323 } 324 325 WebFrontend web; 326 327 void diggerWeb( 328 Option!(string, "Interface to listen on.\nDefault is \"localhost\" (local connections only).", "HOST") host = "localhost", 329 Option!(ushort, "Port to listen on. Default is 0 (random unused port).") port = 0) 330 { 331 // Adjust working directory. 332 { 333 import std.file; 334 if (!"web".exists && thisExePath.dirName.buildPath("web").exists) 335 thisExePath.dirName.chdir(); 336 } 337 338 web = new WebFrontend(host, port); 339 340 showURL(host, web.port); 341 342 startWatchdog(); 343 344 socketManager.loop(); 345 } 346 347 mixin main!(funopt!diggerWeb);