Python 代码实践小结

  • A+
所属分类:Python

最近写了较多的 Python 脚本,将最近自己写的脚本进行一个总结,其中有些是 Python 独有的,有些是所有程序设计中共有的:

  1. 考虑使用 Logger(logger 怎么配置,需要输出哪些信息 — 可以反向考虑,自己看到这个 logger 的时候想了解什么信息)
  2. 传递的数据结构如何考虑(是否对调用方有先验知识的要求,比如返回一个 Tuple,则需要用户了解 tuple 中元素的顺序,这样情况是否应该进行封装;),数据结构定义清楚了,很多东西也就清楚了。
  3. 如何操作数据库(可以学习 sqlalchemy,包括 core 和 orm 两种 api)
  4. 异常如何处理(异常应该分开捕获 — 可以清楚的知道什么情况下导致的,异常之后应该打印日志说明出现什么问题,如果情况恶劣需要进行异常再次抛出或者报警)
  5. 所有获取资源的地方都应该做 check(a. 没有获取到会怎么办;b.获取到异常的怎么办)
  6. 所有操作资源的地方都应该检查是否操作成功
  7. 每个函数都应该简短,如果函数过长应该进行拆分(有个建议值,函数包含的行数应该在 20-30 行之间,具体按照这个规范做过一次之后就会发现这样真好)
  8. 使用 class 之后,考虑重构 __str__ 函数,用户打印输出(如果不实现 __str__ ,会调用 __repr__ ),如果对象放到 collection 中之后,需要实现 __repr__ 函数,用于打印整个 collection 的时候,直观显示
  9. 如果有些资源会发生变化,可以单独抽取出来,做成函数,这样后续调用就可以不用改变了

上述总结肯定有片面的地方,也有不全的地方,欢迎指出

Python 代码实践小结

附上一份 Python2.7 代码(将一些私有的东西进行了修改)

  1. # -*- coding:utf-8 -*-
  2. from sqlalchemy import create_engine
  3. import logging
  4. from logging.config import fileConfig
  5. import requests
  6. import Clinet # 私有的模块
  7. fileConfig("logging_config.ini")
  8. logger = logging.getLogger("killduplicatedjob")
  9. #配置可以单独放到一个模块中
  10. DB_USER = "xxxxxxx"
  11. DB_PASSWORD = "xxxxxxxx"
  12. DB_PORT = 111111
  13. DB_HOST_PORT = "xxxxxxxxxx"
  14. DB_DATA_BASE = "xxxxxxxxxxx"
  15. REST_API_URL = "http://sample.com"
  16. engine = create_engine("mysql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST_PORT, DB_PORT, DB_DATA_BASE))
  17. # 这个 class 是为了在函数间传递时,不需要使用方了解属性的具体顺序而写的,也可以放到一个单独的模块中
  18. class DuplicatedJobs(object):
  19.  def __init__(self, app_id, app_name, user):
  20.  self.app_id = app_id
  21.  self.app_name = app_name
  22.  self.user = user
  23. def __repr__(self):
  24.  return '[appid:%s, app_name:%s, user:%s]' % (self.app_id, self.app_name, self.user)
  25. def find_duplicated_jobs():
  26.  logger.info("starting find duplicated jobs")
  27.  (running_apps, app_name_to_user) = get_all_running_jobs()
  28.  all_apps_on_yarn = get_apps_from_yarn_with_queue(get_resource_queue())
  29. duplicated_jobs = []
  30.  for app in all_apps_on_yarn:
  31.  (app_id, app_name) = app
  32. if app_id not in running_apps:
  33.  if not app_name.startswith("test"):
  34.  logger.info("find a duplicated job, prefixed_name[%s] with appid[%s]" % (app_name, app_id))
  35.  user = app_name_to_user[app_name]
  36.  duplicated_jobs.append(DuplicatedJobs(app_id, app_name, user))
  37.  else:
  38.  logger.info("Job[%s] is a test job, would not kill it" % app_name)
  39. logger.info("Find duplicated jobs [%s]" % duplicated_jobs)
  40. return duplicated_jobs
  41. def get_apps_from_yarn_with_queue(queue):
  42.  param = {"queue": queue}
  43.  r = requests.get(REST_API_URL, params=param)
  44.  apps_on_yarn = []
  45.  try:
  46.  jobs = r.json().get("apps")
  47.  app_list = jobs.get("app", [])
  48.  for app in app_list:
  49.  app_id = app.get("id")
  50.  name = app.get("name")
  51.  apps_on_yarn.append((app_id, name))
  52. except Exception as e: #Exception 最好进行单独的分开,针对每一种 Exception 进行不同的处理
  53.  logger.error("Get apps from Yarn Error, message[%s]" % e.message)
  54. logger.info("Fetch all apps from Yarn [%s]" % apps_on_yarn)
  55. return apps_on_yarn
  56. def get_all_running_jobs():
  57.  job_infos = get_result_from_mysql("select * from xxxx where xx=yy")
  58. app_ids = []
  59.  app_name_to_user = {}
  60.  for (topology_id, topology_name) in job_infos:
  61.  status_set = get_result_from_mysql("select * from xxxx where xx=yy")
  62.  application_id = status_set[0][0]
  63.  if "" != application_id:
  64.  configed_resource_queue = get_result_from_mysql(
  65.  "select * from xxxx where xx=yy")
  66.  app_ids.append(application_id)
  67.  app_name_to_user[topology_name] = configed_resource_queue[0][0].split(".")[1]
  68. logger.info("All running jobs appids[%s] topology_name2user[%s]" % (app_ids, app_name_to_user))
  69.  return app_ids, app_name_to_user
  70. def kill_duplicated_jobs(duplicated_jobs):
  71.  for job in duplicated_jobs:
  72.  app_id = job.app_id
  73.  app_name = job.app_name
  74.  user = job.user
  75.  logger.info("try to kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))
  76.  try:
  77.  Client.kill_job(app_id, user)
  78.  logger.info("Job[%s] with appid[%s] for user[%s] has been killed" % (app_name, app_id, user))
  79.  except Exception as e:
  80.  logger.error("Can't kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))
  81. def get_result_from_mysql(sql):
  82.  a = engine.execute(sql)
  83.  return a.fetchall()
  84. # 因为下面的资源可能发生变化,而且可能包含一些具体的逻辑,因此单独抽取出来,独立成一个函数
  85. def get_resource_queue():
  86.  return "xxxxxxxxxxxxx"
  87. if __name__ == "__main__":
  88.  kill_duplicated_jobs(find_duplicated_jobs())

其中 logger 配置文件如下(对于 Python 的 logger,官方文档写的非常好,建议读一次,并且实践一次)

  1. [loggers]
  2. keys=root, simpleLogger
  3. [handlers]
  4. keys=consoleHandler, logger_handler
  5. [formatters]
  6. keys=formatter
  7. [logger_root]
  8. level=WARN
  9. handlers=consoleHandler
  10. [logger_simpleLogger]
  11. level=INFO
  12. handlers=logger_handler
  13. propagate=0
  14. qualname=killduplicatedjob
  15. [handler_consoleHandler]
  16. class=StreamHandler
  17. level=WARN
  18. formatter=formatter
  19. args=(sys.stdout,)
  20. [handler_logger_handler]
  21. class=logging.handlers.RotatingFileHandler
  22. level=INFO
  23. formatter=formatter
  24. args=("kill_duplicated_streaming.log", "a", 52428800, 3,)
  25. [formatter_formatter]
  26. format=%(asctime)s %(name)-12s %(levelname)-5s %(message)s

来源:36大数据

End.

36大数据
R语言神经网络模型银行客户信用评估数据
深入浅出数据分析(中文版)
基于大数据的用户特征分析
中国大数据生态图谱&大数据交易市场专题研究报告

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: