opal/server.py
2012-04-10 07:28:46 +00:00

265 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#-------------------------------------------------------------------------------
# Name: server.py
# Purpose:
#
# Author: Anton Vakhrushev
#
# Created: 14.03.2012
# Copyright: (c) Anton Vakhrushev 2012
# Licence: LGPL
#-------------------------------------------------------------------------------
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import os
import sys
import json
import time
import datetime
import threading
import subprocess
import task
globallock = threading.Lock()
def WriteToLog(msg):
with globallock:
tm = str(datetime.datetime.now())
msg = tm + ' ' + str(msg)
#self.log.write(msg + '\n')
print msg
def GenerateId(data):
import hashlib
title = data['title']
author = data['author']
id = hashlib.md5(title + author).hexdigest()
return id
class LocalServer:
"""
"""
def __init__(self):
self.max_workers = 2
self.task_descrs = []
self.jobs_queue = []
self.log = None
self.queue_lock = threading.Lock()
# init actions
self.log = open('log.txt', 'w')
self.WriteToLog('local server initialized')
for i in xrange(2):
worker = Worker(self.jobs_queue, self.queue_lock)
worker.start()
def Close(self):
self.WriteToLog('local server closed\n')
def __del__(self):
self.Close()
def WriteToLog(self, msg):
tm = str(datetime.datetime.now())
msg = tm + ' ' + msg
self.log.write(msg + '\n')
print msg
def Start(self):
pass
def Stop(self):
pass
def TestTaskData(self, data):
pass
def LoadTasksDescriptions(self, source = 'tasks.conf'):
"""
"""
self.task_descrs = []
self.WriteToLog('tasks interrogation starts')
for line in open(source, 'r'):
try:
# нормализуем указанный путь
line = os.path.normpath(line)
line = os.path.abspath(line)
# считываем данные через shell (важно для скриптовых языков)
textdata = subprocess.check_output([line, '-i'], shell = True,
cwd = os.path.dirname(line))
# загружаем данные описания задачи
data = json.loads(textdata)
# провряем их на корректность
self.TestTaskData(data)
# пакуем все в объект-описание задачи
task_descr = task.TaskDescription(self, line, data)
# добавляем в список описаний
self.task_descrs.append(task_descr)
self.WriteToLog('Task from "{}" asked'.format(line))
except IOError, e:
self.WriteToLog('file "{}" not found'.format(line))
except subprocess.CalledProcessError, e:
self.WriteToLog('file "{}" not opened, error {} (msg: {})'.format(line, e, e.output))
except ValueError, e:
self.WriteToLog('file "{}" not opened, error "{}")'.format(line, e))
def GetTasksDescriptions(self):
"""
Return list with task descriptions
"""
return self.task_descrs
def GetJobsCount(self):
pass
def GetJob(self, index):
pass
def AddJob(self, taskd, datadump):
job = Job(taskd, datadump)
with self.queue_lock:
self.jobs_queue.append(job)
WriteToLog('Job added')
return job
#-------------------------------------------------------------------------------
class Worker(threading.Thread):
number = 0
def __init__(self, queue, lock):
threading.Thread.__init__(self)
self.queue = queue
self.lock = lock
self.daemon = True
WriteToLog('worker started')
self.id = Worker.number
Worker.number += 1
def Cycle(self):
job = None
# найти следующее готовое к выполнению задание
with self.lock:
for j in self.queue:
if not j.IsBusy():
job = j
job.SetBusy()
break
# и, если нашли, приступаем к выполнению
if job:
WriteToLog("{} started!".format(self.id))
job.Start()
WriteToLog("{} finished!".format(self.id))
else:
time.sleep(1)
def run(self):
while True:
self.Cycle()
#-------------------------------------------------------------------------------
JOB_READY = 0
JOB_BUSY = 1
JOB_RUNNING = 2
JOB_STOPPED = 3
JOB_COMPLETED = 4
class Job:
def __init__(self, taskd, datadump):
self.taskd = taskd
self.datad = datadump
self.state = JOB_READY
self.percent = 0.0
self.comment = ''
self.result = None
self.proc = None
def ProcessMsg(self, msg):
# разбираем полученный ответ
data = json.loads(msg)
# извлекаем оттуда ответ
ans = data['answer']
# ответ получен ок или предупреждение
# записываем значение прогресса, если имеется
if ans == 'ok' or ans == 'warning':
self.percent = data.get('value', 0.0)
# в ответе пришел результат вычислений
# помещаем в секцию результата
elif ans == 'result':
self.result = data['result']
# произошла ошибка
elif ans == 'error':
WriteToLog('Error!')
# недокументированный ответ приложения
else:
pass
# возможно, комментарий прольет свет на проблему
self.comment = data.get('comment', '')
def Start(self):
try:
self.state = JOB_RUNNING
execpath = self.taskd.execpath
# запускаем процесс на выполнение
self.proc = subprocess.Popen([execpath, '-r'], shell = True,
stdin = subprocess.PIPE, stdout = subprocess.PIPE,
stderr = subprocess.STDOUT, cwd = os.path.dirname(execpath))
# передаем стартовые параметры
istream = self.proc.stdin
ostream = self.proc.stdout
istream.write(self.datad + '\n')
istream.flush()
# пока процесс не завершится (или его не прибьют)
while self.proc.poll() == None:
try:
msg = ostream.readline()
#msg = msg.strip()
self.ProcessMsg(msg)
except Exception, e:
#WriteToLog('Income msg failed: ' + str(e))
pass
self.state = JOB_COMPLETED
except Exception, e:
WriteToLog('Job loop failed: ' + str(e))
self.state = JOB_STOPPED
def SetBusy(self):
self.state = JOB_BUSY
def IsBusy(self):
return self.state != JOB_READY
def IsRunning(self):
return self.state == JOB_BUSY or self.state == JOB_RUNNING
def IsFinished(self):
return self.state == JOB_COMPLETED or self.state == JOB_STOPPED
def Stop(self):
if self.proc and self.proc.poll() != None:
self.proc.kill()
WriteToLog('Job killed')
def GetState(self):
return self.state
def IsComplete(self):
return self.GetStatus() == JOB_COMPLETE
def GetResult(self):
return self.result
def main():
pass
if __name__ == '__main__':
main()