flask查询es并保存到csv

使用python查询es,通过flask展示数据并将结果以附件的形式保存
程序结构如下
.
├── app
│   ├── forms                表单类目录
│   │   ├── article.py
│   │   ├── base.py
│   │   ├── __init__.py
│   ├── __init__.py          项目核心文件,完成flask初始化,配置加载,蓝图注册等任务
│   ├── libs                      程序调用的底层类和函数
│   │   ├── error_code.py
│   │   ├── errors.py
│   │   ├── es.py
│   │   ├── save_csv.py
│   │   └── test.csv
│   ├── secure.py      机密配置文件,数据库连接等,不上传到仓库
│   ├── setting.py      通用配置文件,图片路径等,上传到仓库
│   ├── static           静态文件目录
│   │   ├── csv
│   │   └── pic
│   ├── templates     模板目录
│   │   ├── all_result.html
│   │   ├── article.html
│   │   └── base.html
│   ├── view_models    处理底层类和函数的类型,用来封装数据
│   │   ├── article.py
│   │   ├── __init__.py
│   └── web             视图函数目录
│   ├── blueprint.py    蓝图定义文件
│   ├── index.py        蓝图文件
│   ├── __init__.py     导入蓝图
│   └── user.py         蓝图文件
├── exclude.list
├── manager.py      生产环境运行文件
├── Pipfile
├── readme.txt
├── requrement.txt
└── run.py        开发环境运行文件
1、创建项目核心文件 app/__init__.py内容如下

from flask import Flask
import os


# 定义一个函数用来创建app核心对象
def create_app():
    app = Flask(__name__)
    # 机密配置文件,不提交git
    app.config.from_object('app.secure')
    # 通用配置文件提交git
    app.config.from_object('app.setting')
    # 获取绝对路径
    app.config["CSV_DIR"] = os.path.join(os.path.abspath(os.path.dirname(__file__)), app.config["CSV_DIR"])
    # 调用注册蓝图
    register_blueprint(app)
    return app


#  创建注册蓝图的方法
def register_blueprint(app):
    # 导入蓝图对象
    from app.web.blueprint import web
    app.register_blueprint(web)

2、创建项目配置文件app/setting.py 和app/secure.py 内容如下
setting.py

# 启用debug
DEBUG = True
# es数据库配置
ES_HOST = '10.3.1.92'
ES_PORT = 9500

# web端口
SERVER_PORT = 80

setting.py

# -*- coding:utf-8 -*-
# 定义项目公共配置文件,生成环境可开发环境相同,可以上传git

# 图片目录
PIC_DIR = "static/pic/"

# csv文件路径
CSV_DIR = "static/csv/"

3、创建libs下的底层类和函数
error_code.py

from app.libs.errors import APIException


class Success(APIException):
    code = 201
    msg = 'ok'
    error_code = 0


class DeleteSuccess(Success):
    code = 202
    error_code = -1


class ServerError(APIException):
    code = 500
    error_code = 999
    msg = 'sorry, we make a mistake'


class ClientTypeError(APIException):
    code = 400
    msg = 'client is invalid'
    error_code = 1006


class ParameterException(APIException):
    code = 400
    msg = 'invalid parameter'
    error_code = 1000


class NotFound(APIException):
    code = 404
    msg = 'the resource are not found'
    error_code = 1001


class AuthFailed(APIException):
    code = 401
    msg = 'authorization failed'
    error_code = 1002


class Forbidden(APIException):
    code = 403
    msg = 'forbidden, not in scope'
    error_code = 1004

errors.py

# -*- coding: utf-8 -*-
import json

from flask import request
from werkzeug.exceptions import HTTPException


class APIException(HTTPException):
    code = 500
    error_code = 999
    msg = 'sorry, we make a mistake'

    def __init__(self, msg=None, code=None, error_code=None,
                 headers=None):
        if code:
            self.code = code
        if error_code:
            self.error_code = error_code
        if msg:
            self.msg = msg
        super(APIException, self).__init__(self.msg, None)

    def get_body(self, environ=None):
        body = dict(
            msg=self.msg,
            error_code=self.error_code,
            request=request.method+' '+self.get_url_no_param()
        )
        text = json.dumps(body)
        return text

    def get_headers(self, environ=None):
        return [('Content-Type', 'application/json')]

    @staticmethod
    def get_url_no_param():
        full_path = request.full_path
        main_path = full_path.split('?')
        return main_path[0]

es.py

from elasticsearch import helpers
from elasticsearch import Elasticsearch
from flask import current_app
import time


class EsObject:
    def __init__(self):
        self.ES_HOST = current_app.config['ES_HOST']
        self.ES_PORT = current_app.config['ES_PORT']
        #self.ES_HOST = '10.3.1.92'
        #self.ES_PORT = 9500
        self.es = Elasticsearch([{'host': self.ES_HOST, 'port': self.ES_PORT}])
        self.res = None
        self.total = 0
        self.KeyWord = ''
        self.StartTime = ''
        self.StopTime = ''
        self.Title = True
        self.Url = True
        self.Time = True

    # 查询所有内容匹配,默认返回一个生成器对象
    # def help_article_content(self, keyword=None, start_time=None, stop_time=None):
    def help_article_content(self, **kw):
        query = {"query": {
            "bool": {"must": {"match_phrase": {"content": self.KeyWord}},
                     "filter": {"range": {"time": {"gte": self.StartTime, "lte": self.StopTime}}}}}}
        self.res = helpers.scan(self.es, index="articlesearch", doc_type="fulltext", query=query)
        # list_article = self._save_result()
        # return list_article
        # return self.res

    # 查询所有标题匹配,默认返回一个生成器对象
    def help_article_title(self, keyword, start_time, stop_time):
        if start_time == None:
            start_time = '1970-01-01T08:00:00'
        if stop_time == None:
            stop_time = time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(time.time()))
        query = {"query": {
            "bool": {"must": {"match_phrase": {"title": keyword}},
                     "filter": {"range": {"time": {"gte": start_time, "lte": stop_time}}}}}}
        self.res = helpers.scan(self.es, index="articlesearch", doc_type="fulltext", query=query)
        # list_article = self._save_result()
        # return list_article
        # return self.res

    def set_attrs(self, attrs_dict):
        for key, value in attrs_dict.items():
            # 判断当前这个对象是否包含名字叫key 的属性
            if hasattr(self, key):
                setattr(self, key, value)

    # 返回部分结果集,通过from 和size参数指定返回的条目
    def search_article_content(self, keyword, start_time, stop_time, start=None, size=None):
        if start_time == None:
            start_time = '1970-01-01T08:00:00'
        if stop_time == None:
            stop_time = time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(time.time()))
        if start == None:
            start = 0
        if size == None:
            size = 5
        query = {"from": start, "size": size, "query": {"bool": {"must": {"match_phrase": {"content": keyword}}, "filter": {
            "range": {"time": {"gte": start_time, "lte": stop_time}}}}}}
        result = Elasticsearch.search(self.es, index="articlesearch", doc_type="fulltext", body=query)
        self.res = result['hits']['hits']
        self.total = result['hits']['total']
        list_article = self._save_result()
        return list_article, self.total

    def search_article_title(self, keyword, start_time, stop_time, start=None, size=None):
        if start_time == None:
            start_time = '1970-01-01T08:00:00'
        if stop_time == None:
            stop_time = time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(time.time()))
        if start == None:
            start = 0
        if size == None:
            size = 5
        query = {"from": start, "size": size, "query": {"bool": {"must": {"match_phrase": {"title": keyword}}, "filter": {
            "range": {"time": {"gte": start_time, "lte": stop_time}}}}}}
        result = Elasticsearch.search(self.es, index="articlesearch", doc_type="fulltext", body=query)
        self.res = result['hits']['hits']
        self.total = result['hits']['total']
        list_article = self._save_result()
        return list_article, self.total

    def _save_result(self):
        list_article = []
        for article in self.res:
            dict_article = {}
            dict_article['title'] = article['_source']['title']
            dict_article['url'] = article['_source']['url']
            dict_article['time'] = article['_source']['time']
            list_article.append(dict_article)
        return list_article

    # 判断
    def _test_argument(self, dict_argument):
        pass


if __name__ == '__main__':
    es = EsObject()
    es.help_article_content()
    attrs_dict = {'KeyWord': '每日经济', 'StartTime': '2019-11-01T00:00:00', 'StopTime': '2019-11-06T00:00:00',
                  'Title': True, 'Url': True}
    es.set_attrs(attrs_dict)
    es.help_article_content()
    generator_result = es.res
    for article in generator_result:
        print(article)

save_csv.py

import csv
from app.libs.es import EsObject
from app.view_models.article import ArticlesViewModel
from flask import current_app


def save_csv(keyword, articles):
    file_name = keyword + ".csv"
    file_path = current_app.config['CSV_DIR'] + file_name
    # print(file_path)
    with open(file_path, 'w', newline='') as wf:
        writer = csv.writer(wf)
        for article in articles:
            list_tmp = []
            list_tmp.append(article.Title)
            list_tmp.append(article.Url)
            list_tmp.append(article.Time)
            # print(list_tmp)
            writer.writerow(list_tmp)
    return file_name


if __name__ == '__main__':
    es = EsObject()
    es.help_article_content()
    attrs_dict = {'KeyWord': '每日经济', 'StartTime': '2019-11-01T00:00:00', 'StopTime': '2019-11-06T00:00:00',
                  'Title': True, 'Url': True, 'Time': True}
    es.set_attrs(attrs_dict)
    es.help_article_content()
    generator_result = es.res
    # for article in generator_result:
    #    print(article)
    articles = ArticlesViewModel()
    articles.save(generator_result, attrs_dict)
    results = articles.articles
    save_csv(results)

4、创建视图函数目录 web 下的
__init__.py

# -*- coding:utf-8 -*-
# 将视图函数的文件中的视图函数导入
# 因为在web/blueprint.py中只是定义蓝图, 但并没有将web目录下的视图函数文件中定义的视图函数导入
# 视图函数没有导入web/blueprint.py那么,函数注册到蓝图也会失败

# 由于book.py和user.py下的视图函数是没有被导入的因此需要在这里导入,否则还是会提示404视图文件没注册的问题
#from app.web import server
from app.web import user
#from app.web import account
from app.web import index

blueprint.py

# 定义蓝图

from flask import Blueprint

# 这里定义一个叫web的通用名称的蓝图,在web/__init__.py中将 web目录下的多个视图函数的文件再注册到web这个蓝图下
web = Blueprint('web', __name__)
# 将web目录下视图函数文件中的视图函数导入的工作交给web/__init__.py

index.py

from .blueprint import web
from urllib.parse import quote
from app.forms.article import SearchForm, ResultForm
from flask import request, render_template, send_file
from app.libs.es import EsObject
from app.libs.save_csv import save_csv
from app.view_models.article import ArticlesViewModel
from flask import current_app
import xlsxwriter
from io import BytesIO


@web.route('/', methods=['GET'])
def index():
    return 'Hello'


# 查询文章,只能返回部分内容
@web.route('/article/', methods=['GET', 'POST'])
def article():
    form = SearchForm(request)
    form2 = ResultForm(request)
    list_article = []
    article_count = 0
    if request.method == 'POST' and form.validate():
        print(form.data)
        keyword = form.data['KeyWord']
        starttime = form.data['StartTime'] if form.data['StartTime'] else None
        stoptime = form.data['StopTime'] if form.data['StopTime'] else None
        es = EsObject()
        result = es.search_article_content(keyword, starttime, stoptime, 0, 5)
        article_count = es.total
        list_article = result[0]
        # print(type(list_article))
        print(list_article)
    return render_template('article.html', form=form, form2=form2, list_article=list_article,
                           article_count=article_count)


# 查询文章,返回所有内容
@web.route('/all_article/', methods=['GET', 'POST'])
def all_article():
    form = SearchForm(request)
    generator_result = ''
    list_result = []
    file_name = '附件'
    count = 0
    if form.validate():
        keyword = form.data['KeyWord']
        # starttime = form.data['StartTime'] if form.data['StartTime'] else None
        # stoptime = form.data['StopTime'] if form.data['StopTime'] else None
        es = EsObject()
        # 将查询参数保存
        # print(form.data)
        es.set_attrs(form.data)
        # es.help_article_content(keyword, starttime, stoptime)
        es.help_article_content()
        generator_result = es.res
        # for i in generator_result:
        #     print(i['_source']['title'], i['_source']['url'], i['_source']['time'])
        articles = ArticlesViewModel()
        articles.save(generator_result, form.data)
        count = articles.total
        list_result = articles.articles
        # 将结果保存到csv
        file_name = save_csv(keyword, list_result)
        result_path = current_app.config['CSV_DIR'] + file_name

    # return "test"
    return render_template('all_result.html', form=form, result=list_result, count=count, file_name=file_name)


@web.route("/download/<file_name>", methods=["GET"])
def download(file_name):
    file_path = current_app.config['CSV_DIR'] + file_name
    filename = quote(file_name)
    rv = send_file(file_path, as_attachment=True, attachment_filename=filename)
    rv.headers['Content-Disposition'] += "; filename*=utf-8''{}".format(filename)
    return rv

5、创建forms目录下的
base.py

# -*- coding: utf-8 -*-
from wtforms import Form
from wtforms.validators import DataRequired as WTFDataRrequired
from app.libs.error_code import ParameterException


class BaseForm(Form):
    def __init__(self, request):
        data = request.get_json(silent=True)
        args = request.args.to_dict()
        formdata = request.form
        super(BaseForm, self).__init__(formdata=formdata, data=data, **args)

    def validate_for_api(self):
        valid = super(BaseForm, self).validate()
        if not valid:
            raise ParameterException(msg=self.errors)
        return self


class DataRequired(WTFDataRrequired):
    """
        重写默认的WTF DataRequired,实现自定义message
        DataRequired是一个比较特殊的验证器,当这个异常触发后,
        后续的验证(指的是同一个validators中的验证器将不会触发。
        但是其他验证器,比如Length就不会中断验证链条。
    """

    def __call__(self, form, field):
        if self.message is None:
            field_text = field.label.text
            self.message = field_text + '不能为空,请填写' + field_text
        super(DataRequired, self).__call__(form, field)

article.py

# -*- coding: utf-8 -*-
from app.forms.base import BaseForm, DataRequired
from wtforms import  StringField, IntegerField,  SubmitField, BooleanField
from wtforms.validators import Length, NumberRange


class SearchForm(BaseForm):
    KeyWord = StringField(
        validators=[
            DataRequired(message='关键字不能为空'),
            Length(2, 20, message='关键字长度2到20')
        ],
        render_kw={
            "placeholder": "搜索关键字,长度2-20"
        }
    )
    StartTime = StringField(
        render_kw={
            "placeholder": "开始时间:2019-05-30T00:00:00"
        }
    )
    StopTime = StringField(
        render_kw={
            "placeholder": "结束时间:2019-05-30T13:00:00"
        }
    )
    Start = IntegerField(
        label='起始位置',
        default=0,
        validators=[
            NumberRange(min=0)
        ]
    )
    Size = IntegerField(
        label='数量',
        default=5,
        validators=[
            DataRequired(),
            NumberRange(min=1, max=100)
        ]
    )
    Title = BooleanField(
        label='标题',
        default=True
    )
    Time = BooleanField(
        label='时间',
        default=True
    )
    Url = BooleanField(
        label='url',
        default=True
    )
    submit = SubmitField('提交')

6、创建view_model目录下的
article.py

class ArticleViewModel:
    def __init__(self):
        # self.title = article['_source']['title']
        # self.url = article['_source']['url']
        # self.time = article['_source']['time']
        self.Title = ''
        self.Url = ''
        self.Time = ''

    def cut_data(self, list_field, article):
        for field in list_field:
            field2 = field.lower()
            # print(type(field))
            # print(field, article['_source'][field])
            setattr(self, field, article['_source'][field2])
        # print(self.Title, self.Url, self.Time)
        return self


# 通过传入一个结果集的生成器对象,处理后再返回一个生成器
class ArticlesViewModel:
    def __init__(self):
        self.total = 0
        self.articles = []

    def save(self, result_generator, dict_field):
        list_field = self._fill_field(dict_field)
        self.articles = [ArticleViewModel().cut_data(list_field, article) for article in result_generator]
        self.total = len(self.articles)

    # 获得表单中要求输出的列
    def _fill_field(self, dict_field):
        list_tmp = []
        # print(dict_field)
        for key, value in dict_field.items():
            if value == True and key != 'submit':
                list_tmp.append(key)
        # print(list_tmp)
        return list_tmp

7、创建templates模板目录下的
base.html

<!DOCTYPE html>
<html>

<head>
    <title>CMS文件管理平台</title>
    {% block head_meta %}
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta name="description" content="">
    <meta name="author" content="">
    {% endblock %}
    {# {% block head_css %}#}
    {#
    <link href="{{ url_for('static', filename='bootstrap/bootstrap3/swatch/default/bootstrap.min.css') }}"
          rel="stylesheet">
    #}
    {#
    <link href="{{ url_for('static', filename='bootstrap/bootstrap3/css/bootstrap-theme.min.css') }}" rel="stylesheet">
    #}
    {#
    <link href="{{ url_for('static', filename='admin/css/bootstrap3/admin.css') }}" rel="stylesheet">
    #}
    {#
    <style>#
    }
    {
    #
    body {
        #
    }

    {
        # padding-top: 4px
    ;
        #
    }
    {
        #
    }
    #
    }
    {
        #        </style>
    #}
    {# {% endblock %}#}
    {% block head %}
    <link rel="stylesheet" href="https://cdn.staticfile.org/twitter-bootstrap/3.3.7/css/bootstrap.min.css">
    <script src="https://cdn.staticfile.org/jquery/2.1.1/jquery.min.js"></script>
    <script src="https://cdn.staticfile.org/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
    {% endblock %}

</head>

<body>
{% block page_body %}
<div class="container">
    <nav class="navbar navbar-default" role="navigation">
        <!-- Brand and toggle get grouped for better mobile display -->
        <div class="navbar-header">
            <button type="button" class="navbar-toggle" data-toggle="collapse" data-target="#admin-navbar-collapse">
                <span class="sr-only">Toggle navigation</span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>
        </div>
        <!-- navbar content -->
        {% block main_menu %}
        <ul class="nav navbar-nav">
            {# {{ layout.menu() }}#}
            <li class="dropdown">
                <a data-toggle="dropdown" class="dropdown-toggle" href="#" data-toggle="dropdown" role="button">
                    文章搜索
                    <b class="caret"></b>
                </a>
                <ul class="dropdown-menu">
                    <li>
                        <a href={{ url_for('web.article') }}>返回部分</a>
                    </li>
                    <li>
                        <a href={{ url_for('web.all_article') }}>返回全部</a>
                    </li>
                </ul>
            </li>
        </ul>
        {% endblock %}

        {% block menu_links %}

        {% endblock %}
        {% block access_control %}
        {% endblock %}
    </nav>

    {% block messages %}
    {% with messages = get_flashed_messages() %}
    {% for message in messages %}
    {% if message %}
    <div class="alert alert-warning" role="alert">{{ message }}</div>
    {% endif %}
    {% endfor %}
    {% endwith %}
    {% if form and form.errors %}
    {% for key, error in form.errors.items() %}
    <div class="alert alert-warning" role="alert">{{ error }}</div>
    {% endfor %}
    {% endif %}
    {% endblock %}

    {% block body %}{% endblock %}
</div>
{% endblock %}
</body>

{% block script %}
{% endblock %}
</html>

article.html

{% extends "base.html" %}
{% block body %}

    <form action="" method="post">
        {{ form.csrf_token }}
        {{ form.KeyWord }}<br>
        {% for msg in form.KeyWord.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.StartTime }}<br>
        {% for msg in form.StartTime.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.StopTime }}<br>
        {% for msg in form.StopTime.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.Start.label }}<br>
        {{ form.Start }}<br>
        {% for msg in form.Start.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.Size.label }}<br>
        {{ form.Size }}<br>
        {% for msg in form.Size.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.submit }}<br>
    </form>
    <form action="{{ url_for('web.download') }}" method="post">
    <table>
        <tr>
            <td>标题</td>
            <td>时间</td>
            <td>url</td>
        </tr>
        {{ form2.csrf_token }}
        {% for article in list_article %}
            <tr name="123">
                <td>{{ form2.Title(value=article['title']) }}</td>
                <td>{{ form2.Time(value=article['time']) }}</td>
                <td>{{ form2.Url(value=article['url']) }}</td>
                <!--
                <td><input type="text" name="Title" class="form-control"  value="{{ article['title'] }}"></td>
                <td><input type="text" name="Time" class="form-control"  value="{{ article['time'] }}"></td>
                <td><input type="text" name="Url" class="form-control"  value="{{ article['url'] }}"></td>
                -->
            </tr>
        {% endfor %}
    </table>
        {{form2.submit}}<br>
    </form>
{% endblock %}

all_result.html

{% extends "base.html" %}
{% block body %}
    <form action="" method="post">
        {{ form.KeyWord }}<br>
        {% for msg in form.KeyWord.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.StartTime }}<br>
        {% for msg in form.StartTime.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.StopTime }}<br>
        {% for msg in form.StopTime.errors %}
            <small class="errors">{{ msg }}</small><br>
        {% endfor %}
        {{ form.Title.label }}{{ form.Title }}<br>
        {{ form.Url.label }}{{ form.Url }}<br>
        {{ form.Time.label }}{{ form.Time }}<br>
        {{ form.submit }}<br>
    </form>
    {% if file_name %}
        附件未生成
        <a href="{{ url_for('web.download',file_name=file_name) }}">{{file_name}}</a>
    {% else %}
        附件下载
    {% endif %}
    <table>
        <tr>
            <td>标题</td>
            <td>时间</td>
            <td>url</td>
        </tr>
        {% for article in result %}
            <tr>
                <td>{{ article.Title }}</td>
                <td>{{ article.Time }}</td>
                <td>{{ article.Url }}</td>
            </tr>
        {% endfor %}
    </table>
{% endblock %}

8、创建项目启动文件
manager.py

# -*- coding:utf-8 -*-
# 线上部署运行入口文件

from app import create_app
from flask_script import Server, Manager

app = create_app()
manager = Manager(app)

# 自定义一个启动命令
manager.add_command('runserver',
                    Server(host='0.0.0.0', port=app.config['SERVER_PORT'], use_debugger=True, use_reloader=True))


def main():
    manager.run()


if __name__ == '__main__':
    try:
        import sys
        sys.exit(main())
    except Exception as e:
        import traceback

        traceback.print_exc()

run.py

# -*- coding:utf-8 -*-
# 项目入口文件,负责完成web服务器的启动、配置文件的加载和读取、初始化flask核心对象(或将初始化任务放入app/__init__.py)
# 不将业务相关的代码放在这里
# 开发时使用

# 从app这个包引入自定义的创建app对象的函数
from app import create_app

app = create_app()

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80, debug=app.config['DEBUG'])

 

发表评论