From c7b38a6506b878b0f881c96c5977ed79e42259cb Mon Sep 17 00:00:00 2001 From: anwinged Date: Tue, 10 Apr 2012 07:28:46 +0000 Subject: [PATCH] Base integration with user apps --- manual/for_prog.tex | 20 ++++----- server.py | 98 ++++++++++++++++++++++++++++++++------------- task.py | 31 ++++++++++---- tasks/testt.json | 19 ++++----- tasks/testt.py | 94 ++++++++++++++++++++++++++++++++++--------- 5 files changed, 190 insertions(+), 72 deletions(-) diff --git a/manual/for_prog.tex b/manual/for_prog.tex index c363556..8c1c1c5 100644 --- a/manual/for_prog.tex +++ b/manual/for_prog.tex @@ -95,18 +95,18 @@ ok где %user-id% и %task-id% обозначают уникальные идентификаторы клиента и задачи соответственно. О том, как они формируются и какую роль играют можно прочитать в последней части данного сочинения. Сервер на основе uid и tid перенаправляет запрос нужной задаче, если, конечно, в данный момент она способна его обработать. После чего высылает клиенту результат разпроса (к примеру): - { - “answer”: “ok”, - “code”: 0, - “value”: 0.6321, - “comment”: “in progress, all right” -} + { + “answer”: “ok”, + “code”: 0, + “value”: 0.6321, + “comment”: “in progress, all right” + } Если, например, клиент ошибся в tid или пытается получить доступ к чужой задаче, сервер даст ответ - { - “answer”: “error”, - “comment”: “access denied” - } + { + “answer”: “error”, + “comment”: “access denied” + } \section{Описание данных} diff --git a/server.py b/server.py index 5cd551b..109a5b4 100644 --- a/server.py +++ b/server.py @@ -54,8 +54,9 @@ class LocalServer: self.log = open('log.txt', 'w') self.WriteToLog('local server initialized') - worker = Worker(self.jobs_queue, self.queue_lock) - worker.start() + for i in xrange(2): + worker = Worker(self.jobs_queue, self.queue_lock) + worker.start() def Close(self): self.WriteToLog('local server closed\n') @@ -125,42 +126,50 @@ class LocalServer: with self.queue_lock: self.jobs_queue.append(job) WriteToLog('Job added') + return job #------------------------------------------------------------------------------- class Worker(threading.Thread): + number = 0 + def __init__(self, queue, lock): threading.Thread.__init__(self) self.queue = queue self.lock = lock self.daemon = True WriteToLog('worker started') + self.id = Worker.number + Worker.number += 1 - def FindNext(self): - result = None - for job in self.queue: - if job.GetState() == JOB_READY: - result = job - return result + def Cycle(self): + job = None + # найти следующее готовое к выполнению задание + with self.lock: + for j in self.queue: + if not j.IsBusy(): + job = j + job.SetBusy() + break + # и, если нашли, приступаем к выполнению + if job: + WriteToLog("{} started!".format(self.id)) + job.Start() + WriteToLog("{} finished!".format(self.id)) + else: + time.sleep(1) def run(self): while True: - job = None - # найти следующее готовое к выполнению задание - with self.lock: - job = self.FindNext() - # и, если нашли, приступаем к выполнению - if job: - job.Start() - else: - time.sleep(1) + self.Cycle() #------------------------------------------------------------------------------- JOB_READY = 0 -JOB_RUNNING = 1 -JOB_STOPPED = 2 -JOB_COMPLETED = 3 +JOB_BUSY = 1 +JOB_RUNNING = 2 +JOB_STOPPED = 3 +JOB_COMPLETED = 4 class Job: def __init__(self, taskd, datadump): @@ -168,16 +177,36 @@ class Job: self.datad = datadump self.state = JOB_READY self.percent = 0.0 + self.comment = '' self.result = None self.proc = None def ProcessMsg(self, msg): - WriteToLog(msg.strip()) + # разбираем полученный ответ + data = json.loads(msg) + # извлекаем оттуда ответ + ans = data['answer'] + # ответ получен ок или предупреждение + # записываем значение прогресса, если имеется + if ans == 'ok' or ans == 'warning': + self.percent = data.get('value', 0.0) + # в ответе пришел результат вычислений + # помещаем в секцию результата + elif ans == 'result': + self.result = data['result'] + # произошла ошибка + elif ans == 'error': + WriteToLog('Error!') + # недокументированный ответ приложения + else: + pass + # возможно, комментарий прольет свет на проблему + self.comment = data.get('comment', '') + def Start(self): - #try: + try: self.state = JOB_RUNNING - WriteToLog('Job started') execpath = self.taskd.execpath # запускаем процесс на выполнение self.proc = subprocess.Popen([execpath, '-r'], shell = True, @@ -192,14 +221,27 @@ class Job: while self.proc.poll() == None: try: msg = ostream.readline() + #msg = msg.strip() self.ProcessMsg(msg) except Exception, e: - WriteToLog('Income msg failed' + str(e)) + #WriteToLog('Income msg failed: ' + str(e)) + pass self.state = JOB_COMPLETED - WriteToLog('Job done') - #except: - #WriteToLog('Job loop failed') - #self.state = JOB_STOPPED + except Exception, e: + WriteToLog('Job loop failed: ' + str(e)) + self.state = JOB_STOPPED + + def SetBusy(self): + self.state = JOB_BUSY + + def IsBusy(self): + return self.state != JOB_READY + + def IsRunning(self): + return self.state == JOB_BUSY or self.state == JOB_RUNNING + + def IsFinished(self): + return self.state == JOB_COMPLETED or self.state == JOB_STOPPED def Stop(self): if self.proc and self.proc.poll() != None: diff --git a/task.py b/task.py index a6b2dc6..7f17dfb 100644 --- a/task.py +++ b/task.py @@ -11,6 +11,8 @@ #!/usr/bin/env python # -*- coding: UTF-8 -*- +import copy + class TaskDescription: """ Description of the task. Task runs on server. @@ -113,8 +115,7 @@ class DataDefinition: self.params = {} for param in self.DD.pdata: self.params[param] = self.DD[param].GetDefault() - - self.taskjob = None + self.job = None def __getitem__(self, label): return self.params[label] @@ -125,6 +126,12 @@ class DataDefinition: else: raise ValueError + def Copy(self): + res = copy.copy(self) + res.params = copy.copy(self.params) + res.job = None + return res + def PackParams(self): package = [] owner = self @@ -138,7 +145,7 @@ class DataDefinition: def Flush(self): server = self.DD.taskd.server datadump = self.PackParams() - server.AddJob(self.DD.taskd, datadump) + self.job = server.AddJob(self.DD.taskd, datadump) #------------------------------------------------------------------------------- @@ -157,14 +164,24 @@ def main(): mdef = DataDefinition(model) pprint(mdef.DD.data) - mdef['x'] = 20 - + mdef['r'] = 3.14 + mdef['n'] = 5 + mdef['d'] = 0 + mdef2 = mdef.Copy() + mdef2['d'] = 30 p = mdef.PackParams() pprint(p) + p = mdef2.PackParams() + pprint(p) mdef.Flush() - mdef.Flush() + mdef2.Flush() + + time.sleep(3) + + print 'RESULT' + pprint(mdef.job.result) + pprint(mdef2.job.result) - time.sleep(15) if __name__ == '__main__': main() diff --git a/tasks/testt.json b/tasks/testt.json index 54d371b..f9d9204 100644 --- a/tasks/testt.json +++ b/tasks/testt.json @@ -4,7 +4,7 @@ "models": { - "simpleexample": { + "sintaylor": { "title": "Simple model for example", "author": "Anton Vakhrushev", @@ -14,20 +14,21 @@ "params": { - "x": { - "type": "int", - "default": 10, - "title": "Main parameter" + "r": { + "type": "double", + "default": 6.28, + "comment": "Right edge" }, - "u": { - "type": "double", - "default": 3.14 + "d": { + "type": "int", + "default": 5, + "comment": "Number of members in taylor serie" }, "n": { "type": "int", - "default": 1000, + "default": 10, "title": "Steps", "comment": "Number of steps for algorithm" } diff --git a/tasks/testt.py b/tasks/testt.py index cfc7e33..b3469c9 100644 --- a/tasks/testt.py +++ b/tasks/testt.py @@ -1,33 +1,91 @@ #! coding: utf-8 +# Тестовое приложение для проекта Opal +# Вычисление значений синуса по формулам Тейлора +# Вычисляет значения для указанного диапазона с заданной точностью +# и нужным количеством шагов + import sys import json -import os -import time def write(msg): - sys.stdout.write(msg + '\n') + sys.stdout.write(str(msg) + '\n') sys.stdout.flush() +def sin_taylor(x, n): + f = 1 + s = 0.0 + e = 1.0 + x0 = x + for i in xrange(n + 1): + #print e, f, x + f *= (2 * i) * (2 * i + 1) if i else 1 + s += e * x / f + x *= x0 * x0 + e *= -1 + return s + +def answer(p): + return json.dumps({ + "answer": "ok", + "value": p + }) + +def error(msg): + return json.dumps({ + "answer": "error", + "comment": msg + }) + +def result(r): + return json.dumps({ + "answer": "result", + "result": { + "table": { + "head": [{"x": "double"}, {"y": "double"}], + "body": r + } + } + }) + def main(): -# try: + try: - if sys.argv[1] == '-i': - with open('testt.json') as f: - d = json.load(f) - write(json.dumps(d, indent = 2)) + if sys.argv[1] == '-i': - elif sys.argv[1] == '-r': - textdata = raw_input() - #data = json.loads(data) - for i in xrange(10): - time.sleep(0.5) - write(json.dumps({ "hello": "world" })) + with open('testt.json') as f: + d = json.load(f) + write(json.dumps(d, indent = 2)) -# except: -# print 'Error!' -# sys.exit(-1) + elif sys.argv[1] == '-r': + + textdata = raw_input() + data = json.loads(textdata) + + if not len(data) and data[0]['label'] != 'sintaylor': + write(error('Unknown model')) + sys.exit(1) + + params = data[0]['params'] + l = 0 # левая граница + r = params['r'] # правая граница + n = params['n'] # количество шагов + d = params['d'] # количество членов в разложении Тейлора + h = float(r - l) / n # шаг сетки по х + res = [] # таблица резултатов + + while l <= r: + y = sin_taylor(l, d) + res.append([l, y]) + write(answer(round(l / r, 2))) + l += h + + write(result(res)) + + except: + write(error('Fatal error')) + sys.exit(1) if __name__ == '__main__': - main() \ No newline at end of file + main()