From 493352ac4b9c172d708c4be48362b77f0398f574 Mon Sep 17 00:00:00 2001 From: anwinged Date: Sun, 22 Apr 2012 07:06:05 +0000 Subject: [PATCH] New perfect server interface --- trunk/forms.py | 1 + trunk/opal.py | 90 +++++----- trunk/server.py | 368 +++++++++++++++++++++++++---------------- trunk/task.py | 78 +-------- trunk/tasks/testt.json | 1 + trunk/tasks/testt.py | 7 +- 6 files changed, 278 insertions(+), 267 deletions(-) diff --git a/trunk/forms.py b/trunk/forms.py index e40e2ae..45aeaef 100644 --- a/trunk/forms.py +++ b/trunk/forms.py @@ -44,6 +44,7 @@ class MainFrame (wx.Frame): # WARNING: wxPython code generation isn't supported for this widget yet. self.m_params = wxpg.PropertyGridManager(self, style = wxpg.PG_TOOLBAR) + self.m_params.AddPage('fp') bSizer4.Add(self.m_params, 1, wx.EXPAND |wx.ALL, 1) bSizer3.Add(bSizer4, 1, wx.EXPAND, 5) diff --git a/trunk/opal.py b/trunk/opal.py index f60763f..1b59d20 100644 --- a/trunk/opal.py +++ b/trunk/opal.py @@ -29,14 +29,13 @@ class MainFrame(forms.MainFrame): def __init__(self): forms.MainFrame.__init__(self, None) - self.server = s = server.LocalServer() - self.server.LoadTasksDescriptions() - ds = s.GetTasksDescriptions() - models = [] - for d in ds: - models.extend(d.GetModelsDescriptions()) - model = models[0] + s = server.LocalServer() + s.LoadModels() + models = s.GetModels() s.Start() + self.server = s + + model = models[0] self.m_user_models.Bind(wx.EVT_TREE_SEL_CHANGED, self.OnModelActivated) @@ -51,8 +50,6 @@ class MainFrame(forms.MainFrame): self.Bind(wx.EVT_CLOSE, self.OnClose) self.Bind(wx.EVT_IDLE, self.OnIdle) - self.m_params.AddPage('fp') - ov = threading.Thread(target = self.Overseer) ov.daemon = 1 ov.start() @@ -66,26 +63,30 @@ class MainFrame(forms.MainFrame): def Overseer(self): try: + um = self.m_user_models + cycle_count = 0 while True: - if True: - wx.MutexGuiEnter() - #print '-- cycle --' - um = self.m_user_models - #um.Freeze() - item = um.GetRootItem() - while item.IsOk(): - md = um.GetPyData(item) - job = md.job if md else None - if job and job.IsRunning(): - t = os.path.basename(job.taskd.execpath) - p = job.percent * 100 - #print t, p - um.SetItemText(item, str(job.GetState()), 1) - um.SetItemText(item, '{}: {:.2F}%'.format(t, p), 2) - item = um.GetNext(item) - #um.Thaw() - wx.MutexGuiLeave() - time.sleep(0.5) + wx.MutexGuiEnter() + print 'cycle {:-8} '.format(cycle_count) + cycle_count += 1 + item = um.GetRootItem() + while item.IsOk(): + data = um.GetPyData(item) + if data: + jid = data[1] + + if jid != None and self.server.IsJobChanged(jid): + tid = self.server.GetJobTID(jid) + meta = self.server.GetTaskMeta(tid) + t = os.path.basename(meta['exec']) + state = self.server.GetJobState(jid) + um.SetItemText(item, str(state[0]), 1) + um.SetItemText(item, '{}: {:%}'.format(t, state[1]), 2) + print jid, state + + item = um.GetNext(item) + wx.MutexGuiLeave() + time.sleep(0.1) except Exception, e: print 'Error in overseer: ', e @@ -98,9 +99,10 @@ class MainFrame(forms.MainFrame): data = task.DataDefinition(model) child = um.AppendItem(root, 'Default') - um.SetPyData(child, data) + jid = self.server.CreateJob() + um.SetPyData(child, [data, jid]) - def SelectUserModel(self, model_def): + def SelectUserModel(self, model_def, jid): def SelectProperty(param_type): """ @@ -142,7 +144,7 @@ class MainFrame(forms.MainFrame): item = event.GetItem() data = self.m_user_models.GetPyData(item) if data: - self.SelectUserModel(data) + self.SelectUserModel(data[0], data[1]) def OnParamChanging(self, event): #value = event.GetValue() @@ -159,26 +161,26 @@ class MainFrame(forms.MainFrame): param = prop.GetClientData() um = self.m_user_models id = um.GetSelection() - md = um.GetItemPyData(id) - md[param] = value + data, jid = um.GetItemPyData(id) + data[param] = value def OnTest(self, event): um = self.m_user_models id = um.GetSelection() - md = um.GetItemPyData(id) - #wx.MessageBox(md.PackParams()) - md.Flush() - #wx.MessageBox('test') + data, jid = um.GetItemPyData(id) + + self.server.LaunchJob(jid, data) def OnDuplicate(self, event): - um = self.m_user_models - id = um.GetSelection() - title = um.GetItemText(id) - parent = um.GetItemParent(id) - md = um.GetItemPyData(id) - child = um.AppendItem(parent, title + ' Copy') - um.SetPyData(child, md.Copy()) + um = self.m_user_models + id = um.GetSelection() + title = um.GetItemText(id) + parent = um.GetItemParent(id) + md, jid = um.GetItemPyData(id) + child = um.AppendItem(parent, title + ' Copy') + jid = self.server.CreateJob() + um.SetPyData(child, [md.Copy(), jid]) self.SetStatusText('Copy for "{}" created'.format(title), 0) def OnIdle(self, event): diff --git a/trunk/server.py b/trunk/server.py index 21be3f3..f5314da 100644 --- a/trunk/server.py +++ b/trunk/server.py @@ -31,23 +31,19 @@ def WriteToLog(msg): print msg -def GenerateId(data): - import hashlib - title = data['title'] - author = data['author'] - id = hashlib.md5(title + author).hexdigest() - return id - class LocalServer: - """ - """ - def __init__(self): - self.max_workers = 2 - self.task_descrs = [] - self.jobs_queue = [] - self.log = None - self.running = False - self.queue_lock = threading.Lock() + def __init__(self, conf = 'tasks.conf', workers = 2): + """ + """ + self.conf = conf # файл с конфигурацией задач + self.workers = workers # количество потоков выполнения + self.tasks_meta = {} # идентификаор задачи + self.models = [] # список моделей + self.next_job_id = 0 # очередной идентификатор работы + self.jobs = {} # очередб работ + self.log = None # + self.running = False # + self.queue_lock = threading.Lock() # init actions @@ -67,25 +63,14 @@ class LocalServer: self.log.write(msg + '\n') print msg - def Start(self): - self.running = True - for i in xrange(self.max_workers): - worker = Worker(self.jobs_queue, self.queue_lock, self.running) - worker.start() - - def Stop(self): - self.running = False - def TestTaskData(self, data): pass - def LoadTasksDescriptions(self, source = 'tasks.conf'): - """ - """ - self.task_descrs = [] - + def LoadModels(self): + self.tasks_meta = {} + self.models = [] self.WriteToLog('tasks interrogation starts') - for line in open(source, 'r'): + for line in open(self.conf, 'r'): try: # нормализуем указанный путь line = os.path.normpath(line) @@ -93,14 +78,29 @@ class LocalServer: # считываем данные через shell (важно для скриптовых языков) textdata = subprocess.check_output([line, '-i'], shell = True, cwd = os.path.dirname(line)) + # загружаем данные описания задачи data = json.loads(textdata) # провряем их на корректность self.TestTaskData(data) - # пакуем все в объект-описание задачи - task_descr = task.TaskDescription(self, line, data) - # добавляем в список описаний - self.task_descrs.append(task_descr) + + # вычисляем псевдоуникальный идентификатор модели + tid = hash(data['meta']) + # сохраняем описание задачи + self.tasks_meta[tid] = { + 'title': data.get('title', ''), + 'author': data.get('author', ''), + 'meta': data['meta'], + 'exec': line + } + + # выделяем описания моделей + ms = data.get('models', {}) + for label, data in ms.iteritems(): + model_descr = task.DataDescription(None, label, data, tid) + # добавляем в список описаний + self.models.append(model_descr) + self.WriteToLog('Task from "{}" asked'.format(line)) except IOError, e: self.WriteToLog('file "{}" not found'.format(line)) @@ -109,166 +109,244 @@ class LocalServer: except ValueError, e: self.WriteToLog('file "{}" not opened, error "{}")'.format(line, e)) - def GetTasksDescriptions(self): - """ - Return list with task descriptions - """ - return self.task_descrs + def GetModels(self): + return self.models + + def GetTaskMeta(self, tid): + return self.tasks_meta.get(tid) + + #-------------------------------------------------------------------------- + + def CreateJob(self): + jid = self.next_job_id + self.next_job_id += 1 + with self.queue_lock: + self.jobs[jid] = Job() + return jid def GetJobsCount(self): - pass + return len(self.jobs) - def GetJob(self, index): - pass + def GetJobState(self, jid): + job = self.jobs.get(jid) + if job != None: + return job.GetState() - def AddJob(self, taskd, datadump): - job = Job(taskd, datadump) - with self.queue_lock: - self.jobs_queue.append(job) - WriteToLog('Job added') - return job + def IsJobChanged(self, jid): + job = self.jobs.get(jid) + if job != None: + return job.IsChanged() + else: + False + + def GetJobResult(self, jid): + job = self.jobs.get(jid) + if job != None: + return job.GetResult() + + def GetJobTID(self, jid): + job = self.jobs.get(jid) + if job != None: + return job.tid + + def LaunchJob(self, jid, data_def): + job = self.jobs.get(jid) + if job != None: + tid = data_def.DD.tid + datadump = data_def.PackParams() + job.Launch(tid, datadump) + return True + + def StopJob(self, jid): + job = self.jobs.get(jid) + if job != None: + job.Stop() + + #-------------------------------------------------------------------------- + + def Start(self): + self.running = True + for i in xrange(self.workers): + worker = Worker(self) + worker.start() + + def Stop(self): + self.running = False #------------------------------------------------------------------------------- class Worker(threading.Thread): number = 0 - def __init__(self, queue, lock, runflag): + def __init__(self, server): threading.Thread.__init__(self) - self.queue = queue - self.lock = lock - self.runflag = runflag + self.server = server self.daemon = True - WriteToLog('worker started') self.id = Worker.number Worker.number += 1 + WriteToLog('worker started') + + def FindNextJob(self): + with self.server.queue_lock: + for jid, job in self.server.jobs.iteritems(): + # если нашли ожидающую вызова работу + if job.state == JOB_READY: + job.state = JOB_RUNNING # пометим, как запущенную + WriteToLog('Job ({}) found'.format(jid)) + return job + return None + + def ProcessJob(self, job): + try: + execpath = self.server.GetTaskMeta(job.tid)['exec'] + # запускаем процесс на выполнение + proc = subprocess.Popen([execpath, '-r'], shell = True, + stdin = subprocess.PIPE, stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, cwd = os.path.dirname(execpath)) + job.proc = proc + # передаем стартовые параметры + proc.stdin.write(job.datadump + '\n') + proc.stdin.flush() + # пока процесс не завершится (или его не прибьют) + while proc.poll() == None: + msg = proc.stdout.readline() + self.ProcessMessage(job, msg) + if not self.server.running: + proc.kill() + raise KeyError + except Exception, e: + WriteToLog('Job loop failed: ' + str(e)) + job.Finish(JOB_STOPPED) + else: + job.Finish(JOB_COMPLETED, 1.0) + + + def ProcessMessage(self, job, msg): + try: + # разбираем полученный ответ + data = json.loads(msg) + # извлекаем оттуда ответ + ans = data['answer'] + # ответ получен ок или предупреждение + # записываем значение прогресса, если имеется + if ans == 'ok' or ans == 'warning': + job.percent = data.get('value', 0.0) + # в ответе пришел результат вычислений + # помещаем в секцию результата + elif ans == 'result': + job.result = data['result'] + # произошла ошибка + elif ans == 'error': + WriteToLog('Error! ' + msg) + # недокументированный ответ приложения + else: + pass + # возможно, комментарий прольет свет на проблему + job.comment = data.get('comment', '') + # почему изменяем флаг состояния здесь в конце? + # потому как только после правильной обработки сообщения + # мы можем быть уверены, что состояние действительно изменилось + job.ChangeState() + except KeyError as e: + pass + except ValueError as e: + pass def Cycle(self): - job = None # найти следующее готовое к выполнению задание - with self.lock: - for j in self.queue: - if not j.IsBusy(): - job = j - job.SetBusy() - break + job = self.FindNextJob() # и, если нашли, приступаем к выполнению if job: WriteToLog("{} started!".format(self.id)) - job.Start(self.runflag) + self.ProcessJob(job) WriteToLog("{} finished!".format(self.id)) else: time.sleep(1) def run(self): while True: - if not self.runflag: + if not self.server.running: return self.Cycle() #------------------------------------------------------------------------------- JOB_READY = 0 -JOB_BUSY = 1 -JOB_RUNNING = 2 -JOB_STOPPED = 3 -JOB_COMPLETED = 4 -JOB_DROPPED = 5 +JOB_RUNNING = 1 +JOB_STOPPED = 2 +JOB_COMPLETED = 3 class Job: - def __init__(self, taskd, datadump): - self.taskd = taskd - self.datad = datadump - self.state = JOB_READY - self.percent = 0.0 - self.comment = '' - self.result = None - self.proc = None - self.client_data = None + def __init__(self): + self.tid = None + self.datadump = None + self.state = JOB_STOPPED # состояние выполнения работы + self.percent = -1.0 # прогресс (от 0.0 до 1.0 или -1.0) + self.comment = '' # комментарий к ходу выполнения + self.result = None # результат вычислений + self.proc = None # ссылка на субпроцесс + self.state_id = 0 + self.last_state_id = 0 - def ProcessMsg(self, msg): - # разбираем полученный ответ - data = json.loads(msg) - # извлекаем оттуда ответ - ans = data['answer'] - # ответ получен ок или предупреждение - # записываем значение прогресса, если имеется - if ans == 'ok' or ans == 'warning': - self.percent = data.get('value', 0.0) - # в ответе пришел результат вычислений - # помещаем в секцию результата - elif ans == 'result': - self.result = data['result'] - # произошла ошибка - elif ans == 'error': - WriteToLog('Error! ' + msg) - # недокументированный ответ приложения - else: - pass - # возможно, комментарий прольет свет на проблему - self.comment = data.get('comment', '') + def ChangeState(self): + self.state_id += 1 + def GetState(self): + self.last_state_id = self.state_id + return (self.state, self.percent, self.comment) - def Start(self, runflag): - try: - self.state = JOB_RUNNING - execpath = self.taskd.execpath - # запускаем процесс на выполнение - self.proc = subprocess.Popen([execpath, '-r'], shell = True, - stdin = subprocess.PIPE, stdout = subprocess.PIPE, - stderr = subprocess.STDOUT, cwd = os.path.dirname(execpath)) - # передаем стартовые параметры - istream = self.proc.stdin - ostream = self.proc.stdout - istream.write(self.datad + '\n') - istream.flush() - # пока процесс не завершится (или его не прибьют) - while self.proc.poll() == None: - try: - msg = ostream.readline() - #msg = msg.strip() - self.ProcessMsg(msg) + def IsChanged(self): + return self.state_id != self.last_state_id - if not runflag: - self.Stop() - # todo вписать исключения, которые относятся к JSON & dict - except Exception, e: - #WriteToLog('Income msg failed: ' + str(e)) - pass - self.state = JOB_COMPLETED - except Exception, e: - WriteToLog('Job loop failed: ' + str(e)) - self.state = JOB_STOPPED - - def SetBusy(self): - self.state = JOB_BUSY - - def IsBusy(self): - return self.state != JOB_READY - - def IsRunning(self): - return self.state == JOB_BUSY or self.state == JOB_RUNNING - - def IsFinished(self): - return self.state == JOB_COMPLETED or self.state == JOB_STOPPED - - def IsComplete(self): - return self.GetStatus() == JOB_COMPLETE + def Launch(self, tid, datadump): + self.tid = tid + self.datadump = datadump + self.state = JOB_READY + self.ChangeState() def Stop(self): WriteToLog('Try to kill') if self.proc and self.proc.poll() == None: self.proc.kill() + self.ChangeState() WriteToLog('Job killed') - def GetState(self): - return self.state + def Finish(self, state, percent = None): + self.proc = None + self.state = state + if percent: + self.percent = percent + self.ChangeState() def GetResult(self): return self.result +#------------------------------------------------------------------------------- + +import time, random +from pprint import pprint + def main(): - pass + s = LocalServer(workers = 2) + s.LoadModels() + s.Start() + models = s.GetModels() + model = models[0] + md = task.DataDefinition(model) + md['d'] = 10 + md['r'] = 3.14 + + slots = [ s.CreateJob() for i in xrange(5) ] + for jid in slots: + md['n'] = random.randint(20, 30) + print jid, md['n'] + s.LaunchJob(jid, md) + + time.sleep(30) + + for jid in slots: + pprint(s.GetJobResult(jid)) + print '' if __name__ == '__main__': main() diff --git a/trunk/task.py b/trunk/task.py index a027f2c..dbf35d0 100644 --- a/trunk/task.py +++ b/trunk/task.py @@ -12,29 +12,7 @@ # -*- coding: UTF-8 -*- import copy - -class TaskDescription: - """ - Description of the task. Task runs on server. - """ - def __init__(self, server, execpath, data): - """ - ``server`` is owner of task process - - ``execpath`` - path to task executable - - ``data`` is parsed data presentation about models, methods - and meta information - """ - self.server = server - self.execpath = execpath - self.data = data - self.models = [] - for label, data in self.data['models'].iteritems(): - self.models.append(DataDescription(None, label, data, self)) - - def GetModelsDescriptions(self): - return self.models +import json #------------------------------------------------------------------------------- @@ -68,14 +46,15 @@ class Parameter: #------------------------------------------------------------------------------- class DataDescription: - def __init__(self, parent, label, data, taskd): + def __init__(self, parent, label, data, tid): self.parent = parent self.label = label self.data = data - self.taskd = taskd + self.tid = tid # создание описаний параметров self.pdata = self.data.get('params', {}) + # заменяем текстовое описание на объект-параметр for label in self.pdata: par = Parameter(self.pdata[label]) self.pdata[label] = par @@ -83,7 +62,7 @@ class DataDescription: self.specs = [] # рекурсивное создание описаний спецификаций for label, data in self.data.get('spec', {}).iteritems(): - self.specs.append(DataDescription(self, label, data, self.taskd)) + self.specs.append(DataDescription(self, label, data, self.tid)) def GetLabel(self): return self.label @@ -142,50 +121,3 @@ class DataDefinition: package.reverse() return json.dumps(package) - def Flush(self): - server = self.DD.taskd.server - datadump = self.PackParams() - self.job = server.AddJob(self.DD.taskd, datadump) - -#------------------------------------------------------------------------------- - -import server, json, time -from pprint import pprint - -def main(): - s = server.LocalServer() - s.LoadTasksDescriptions() - ds = s.GetTasksDescriptions() - models = [] - for d in ds: - models.extend(d.GetModelsDescriptions()) - - model = models[0] - - mdef = DataDefinition(model) - pprint(mdef.DD.data) - mdef['r'] = 3.14 - mdef['n'] = 5 - mdef['d'] = 0 - mdef2 = mdef.Copy() - mdef2['d'] = 30 - p = mdef.PackParams() - pprint(p) - p = mdef2.PackParams() - pprint(p) - - #mdef.Flush() - mdef2.Flush() - - time.sleep(1) - mdef2.job.Stop() - - time.sleep(5) - print mdef2.job.GetState() - - print 'RESULT' - #pprint(mdef.job.result) - pprint(mdef2.job.result) - -if __name__ == '__main__': - main() diff --git a/trunk/tasks/testt.json b/trunk/tasks/testt.json index 19a39ba..d67d7a1 100644 --- a/trunk/tasks/testt.json +++ b/trunk/tasks/testt.json @@ -1,6 +1,7 @@ { "title": "Example task", "author": "Anton Vakhrushev", + "meta": "av-example-task", "models": { diff --git a/trunk/tasks/testt.py b/trunk/tasks/testt.py index 6be521f..cfc15c7 100644 --- a/trunk/tasks/testt.py +++ b/trunk/tasks/testt.py @@ -42,10 +42,7 @@ def result(r): return json.dumps({ "answer": "result", "result": { - "table": { - "head": [{"x": "double"}, {"y": "double"}], - "body": r - } + "table": [[ {"x": "double"}, {"y": "double"} ]] + r } }) @@ -81,7 +78,7 @@ def main(): res.append([l, y]) write(answer(l / r)) l += h - time.sleep(0.2) + #time.sleep(0.1) write(result(res))