import datetime import json import uuid import dash from dash import set_props, html, dcc from flask import session, request import feffery_antd_components as fac import feffery_utils_components as fuc import feffery_markdown_components as fmc from dash.dependencies import Input, Output, State, MATCH, ClientsideFunction, ALL from config.global_config import AIChatConfig, ApiBaseUrlConfig from config.env import AppConfig from api.aichat import * from server import app # todo 删除按钮未测, 查询历史记录 去除当前session_id app.clientside_callback( # 处理流式回复内容更新相关过程 ClientsideFunction(namespace="aichat_clientside", function_name="handleStreamResponse"), [Output("chat_data_list", "data", allow_duplicate=True), Output("answer_list", "data", allow_duplicate=True)], Input({"type": "assistant-output-sse", "index": ALL}, "data"), [State("chat_data_list", "data"), State("current_problem", "data"), State("answer_list", "data")], prevent_initial_call=True, ) app.clientside_callback( # 控制用户信息输入框内容的发送 ClientsideFunction( namespace="aichat_clientside", function_name="handleUserNewMessageSend" ), [Output("chat-textarea", "value"), Output("current_problem", "data"), Output("chat_data_list", "data", allow_duplicate=True), Output("current_files", "data", allow_duplicate=True) ], [ Input("control-enter-keypress", "pressedCounts"), Input("aichat_enter-keypress", "pressedCounts"), Input("sent-button", "nClicks") ], [State("chat-textarea", "value"), State("chat_data_list", "data"), State("current_files", "data")], prevent_initial_call=True, ) @app.callback([Output("chat_list_div", "children", allow_duplicate=True), Output("current_problem", "data", allow_duplicate=True), Output("chat_data_list", "data", allow_duplicate=True), Output("chat_session_id", "data", allow_duplicate=True) ], Input("new-chat-window", "nClicks"), prevent_initial_call=True) def new_chat_button_click(nClicks): return [], "", [], uuid.uuid4() @app.callback( Output("chat_data_list", "data", allow_duplicate=True), [Input({"type": "stop_eventsource", "index": ALL}, "nClicks"), Input({"type": "continue_eventsource", "index": ALL}, "nClicks")], [State("chat_data_list", "data"), State("answer_list", "data")], prevent_initial_call=True ) def eventsource_operate(nClick1, nClick2, data, answer_list): if (nClick1 is not None and dash.ctx.triggered_id is not None and 'type' in dash.ctx.triggered_id and dash.ctx.triggered_id['type'] == 'stop_eventsource' and (dash.ctx.triggered[0].get("value"))): index = dash.ctx.triggered_id['index'] data[index]['is_stop'] = True return data if (nClick2 is not None and dash.ctx.triggered_id is not None and 'type' in dash.ctx.triggered_id and dash.ctx.triggered_id['type'] == 'continue_eventsource' and (dash.ctx.triggered[0].get("value"))): index = dash.ctx.triggered_id['index'] data[index]['is_stop'] = False for item in answer_list: chat_index = item['index'] if chat_index == index: data[index]['content'] = item['content'] data[index]['is_end'] = item['is_end'] return data return dash.no_update @app.callback(Input("answer_list", "data"), [State("chat_data_list", "data"), State("chat_session_id", "data")], prevent_initial_call=True) def sync_chat_data(answer_list, chat_data_list, chat_session_id): if answer_list and len(answer_list) > 0 and 'is_end' in answer_list[-1] and answer_list[-1]['is_end']: question = chat_data_list[len(chat_data_list) - 2] question_json = dict( chat_id=None, session_id=chat_session_id, session_name=chat_data_list[0]['content'][:20], type=question['type'], is_end=None, is_stop=None, copy_text=None, user=None, time=question['time'] if 'time' in question else None, content=question['content'], operate=None, thumb_down_reason=None, file=question['file'] if 'file' in question else None ) answer = chat_data_list[-1] answer_json = dict( chat_id=None, session_id=chat_session_id, session_name=chat_data_list[0]['content'][:20], type=answer['type'], is_end=answer['is_end'], is_stop=answer['is_stop'], copy_text=answer['copy_text'], user=None, time=answer['time'] if 'time' in answer else None, content=json.dumps(answer['content']), operate=answer['operate'] if 'operate' in answer else None, thumb_down_reason=answer['thumb_down_reason'] if 'thumb_down_reason' in answer else None, file=answer['file'] if 'file' in answer else None ) add_chat(question_json) answer['content'] = json.dumps(answer['content']) add_chat(answer_json) @app.callback([Output("chat_list_div", "children"), Output("new-chat-window", "disabled")], [Input("chat_data_list", "data")], [State("current_problem", "data"), State("chat_session_id", "data"), State("current_machine", "data")], prevent_initial_call=True) def refresh_chat_window_by_chat_list(data, current_problem, session_id, current_machine): if data is not None and len(data) > 0: children = [] for index, item in enumerate(data): if item['type'] == 'question' and item['content'] is not None: question_children = html.Div( [ fuc.FefferyPostEventSource( id={ "type": "assistant-output-sse", "index": index, }, headers={}, url=AIChatConfig.AIChatUrl, body=json.dumps({ "query": item['content'], "user_id": session.get('user_info').get('user_name'), "robot": current_machine, "session_id": session_id, "doc": json.loads(item['file']) if 'file' in item else [], "history": [] }), autoReconnect={"retries": 0, "delay": 0}, ), fac.AntdAvatar( icon="antd-user", mode="icon", className='avatar', style={ "float": "left", "height": "30px", "background": "#3370FF", } ), html.Div( html.Div( [html.Div(item['content']), fac.AntdSpace( [ fac.AntdButton( fac.AntdTag( content=file['file'] if len(file['file']) < 30 else (file['file'][:30] + "...") , style={ 'color': 'rgb(63 215 121)', 'background-color': 'rgb(230, 255, 239)', 'border-color': 'rgb(102, 255, 161)' }), id={'type': 'file', 'index': str(index)+"_"+str(iddx)}, type='link' ) for iddx, file in enumerate(json.loads(item['file'])) if 'file' in item ], wrap=True, ), ], style={ "padding": '6px 0', "word-break": 'break-all', "white-space": 'pre-wrap' } ), style={"padding-left": '40px', "font-size": "14px"} ) ], className="chat-list-content-lighter", style={ "margin-bottom": '16px', "font-weight": '400', } ) children.append(question_children) if item['type'] == 'answer' and item['content'] is not None: answer_children = html.Div( [ fac.AntdAvatar( mode='image', src='/assets/imgs/logo2.png', className='avatar', style={ "float": "left", "height": "30px" } ), html.Div([ fac.AntdCard( [( html.Div( [fac.AntdTable( columns=[{'title': key, 'dataIndex': key} for key in (content_item['content'][0]).keys()], data=content_item['content'], style={"margin-top": "5px"} ), fac.AntdButton( id={"type": "down_load_table", "index": str(index) + "_" + str(idx)}, icon=fac.AntdIcon(icon="antd-download"), style={"position": "absolute", "top": "10px", "right": "0px"} ) ], style={"width": "100%", "position": "relative"} ) if content_item['type'] == 'table' else ( html.Div( html.Iframe( srcDoc=content_item['content'], style={"width": "100%", "height": "100%", "border": "none"}, className="iframeDoc", id={"type": "html_image", "index": str(index) + '_' + str(idx)}, ), style={"width": "100%", "height": "320px", "margin-top": "5px"} ) if content_item['type'] == 'html_image' else ( html.Div( [ fac.AntdIcon( id={"type": "g6-full-screen-icon", "index": str(index) + '_' + str(idx)}, icon="antd-full-screen", style={"float": "right", "font-size": "25px", "cursor": "pointer"} ), html.Div( id={"type": "G6_ER", "index": str(index) + '_' + str(idx)}, className="G6_ER", style={"width": "100%", "height": "100%", "overflow": "hidden"} )], id={"type": "G6_ER_DIV", "index": str(index) + '_' + str(idx)}, style={"width": "100%", "height": "300px", "overflow": "hidden", "margin-top": "5px"} ) if content_item['type'] == 'G6_ER' else html.Div( fmc.FefferyMarkdown( markdownStr=content_item['content'], style={"font-size": "14px"} ), style={"width": "100%", "margin-top": "5px"} ) ) ) ) for idx, content_item in enumerate(item['content'])], style={ "box-shadow": '0px 2px 4px 0px rgba(31, 35, 41, .12)', "border": 'none', "border-radius": '8px', "box-sizing": 'border-box', }, headStyle={'display': 'none'} ), html.Div( html.Div( [fac.AntdButton("继续", id={"type": "continue_eventsource", "index": index}, type="link") if 'is_stop' in item and item['is_stop'] and 'is_end' in item and not item['is_end'] else None, fac.AntdButton("停止回答", id={"type": "stop_eventsource", "index": index}, type="link") if 'is_stop' in item and not item['is_stop'] and 'is_end' in item and not item['is_end'] else None ] ), style={ "display": "flex", "justify-content": "space-between", "align-items": "center", "margin-top": "8px" } ), html.Div( [ html.Div( fac.AntdText( type='secondary', children=[ html.Span( item['date_time'], style={"margin-left": "4px"} ) ] if 'date_time' in item else [] ) ), html.Div( html.Span( children=[ fac.AntdTooltip( title="换个答案", zIndex=99999, children=[ fac.AntdButton( id={"type": "refresh_answer", "index": index}, type="text", children=[ fac.AntdIcon(icon="antd-reload") ], style={ "padding": "4px" } ) ] ), fac.AntdDivider( direction="vertical" ), fac.AntdTooltip( title="复制", zIndex=99999, children=[ fac.AntdCopyText( id={"type": "copy_answer", "index": index}, text=item['copy_text'], className="chat_operate_icon", style={ "padding": "4px", "z-index": "99999" }, beforeIcon=fac.AntdIcon(icon="antd-copy", style={"color": "black"}) ) ] ), fac.AntdDivider( direction="vertical" ), fac.AntdTooltip( title="取消赞同" if 'operate' in item and item[ 'operate'] == 'thumb_up' else "赞同", zIndex=99999, children=[ fac.AntdButton( id={"type": "thumb_up_answer", "index": index}, type="text", children=[ fac.AntdIcon(icon="md-thumb-up") ], style={"padding": "4px", "color": "#E6A23C"} if 'operate' in item and item['operate'] == 'thumb_up' else {"padding": "4px"} ) ] ) if ('operate' in item and item['operate'] != 'thumb_down') or ('operate' not in item) else None, fac.AntdDivider( direction="vertical" ) if ('operate' in item and item['operate'] != 'thumb_down') or ('operate' not in item) else None, fac.AntdTooltip( title="取消反对" if 'operate' in item and item[ 'operate'] == 'thumb_down' else "反对", zIndex=99999, children=[ fac.AntdButton( id={"type": "cancel_thumb_down_answer", "index": index}, type="text", children=[ fac.AntdIcon(icon="md-thumb-down") ], style={"padding": "4px", "color": "#F56C6C"} ) if 'operate' in item and item['operate'] == 'thumb_down' else fac.AntdPopconfirm( fac.AntdButton( type="text", children=[ fac.AntdIcon(icon="md-thumb-down") ], style={"padding": "4px"} ), id={"type": "thumb_down_answer", "index": index}, title='反对原因', trigger="click", zIndex=99999, description=[ fac.AntdInput( id={'type': 'thumb_down_reason', "index": index}, value=item['thumb_down_reason'] if 'thumb_down_reason' in item else '' ) ], placement="leftBottom", confirmCounts=0 ) ] ) if ('operate' in item and item['operate'] != 'thumb_up') or ('operate' not in item) else None ] ) ) if 'is_end' in item and item['is_end'] and 'is_stop' in item and not item['is_stop'] else html.Div() ], style={ "display": "flex", "justify-content": "space-between", "align-items": "center", } ) ], className="content", style={ "padding-left": '40px' } ) ], className="chat-list-content-lighter", style={ "margin-bottom": '16px', "font-weight": '400', } ) children.append(answer_children) return children, False else: return [], True app.clientside_callback( # 使用目标容器id为Input,利用初次自动回调触发即时渲染 ClientsideFunction(namespace="aichat_clientside", function_name="renderG6"), Output({"type": "G6_ER", "index": ALL}, "children"), Input({"type": "G6_ER", "index": ALL}, "id"), State("chat_data_list", "data"), ) app.clientside_callback( # 使用目标容器id为Input,利用初次自动回调触发即时渲染 ClientsideFunction(namespace="aichat_clientside", function_name="full_g6_modal"), Output("g6-full-screen-body", "children"), Input("g6-modal", "style"), State("current_g6", "data"), prevent_initial_call=True ) @app.callback( Output("current_g6", "data", allow_duplicate=True), Input("g6-exit-full-screen", "nClicks"), prevent_initial_call=True ) def exit_full_screen(nClicks): if nClicks is not None and nClicks > 0: set_props("g6-modal", {"style": { "display": 'none' }}) @app.callback( Output("current_g6", "data"), Input({"type": "g6-full-screen-icon", "index": ALL}, "nClicks"), State("chat_data_list", "data"), prevent_initial_call=True, ) def g6_full_screen_operate(nClicks, data): if (nClicks is not None and dash.ctx.triggered_id is not None and 'type' in dash.ctx.triggered_id and dash.ctx.triggered_id['type'] == 'g6-full-screen-icon' and (dash.ctx.triggered[0].get("value"))): index = dash.ctx.triggered_id['index'] chat_idx = index.split("_")[0] idx = index.split("_")[1] g6_data = data[int(chat_idx)]['content'][int(idx)]['content'] set_props("g6-modal", {"style": { "z-index": '99999', "width": '100%', "height": '100%', "position": 'absolute', "left": '0px', "top": '0px', "display": 'block', "background": "white" }}) return g6_data app.clientside_callback( # 使用目标容器id为Input,利用初次自动回调触发即时渲染 ClientsideFunction(namespace="aichat_clientside", function_name="down_load_table"), Input({"type": "down_load_table", "index": ALL}, "nClicks"), State("chat_data_list", "data"), prevent_initial_call=True ) @app.callback([Output("chat_data_list", "data", allow_duplicate=True), Output("current_problem", "data", allow_duplicate=True)], [Input({"type": "refresh_answer", "index": ALL}, "nClicks"), Input({"type": "thumb_up_answer", "index": ALL}, "nClicks"), Input({"type": "thumb_down_answer", "index": ALL}, "confirmCounts"), Input({"type": "cancel_thumb_down_answer", "index": ALL}, "nClicks")], [State("chat_data_list", "data"), State({'type': 'thumb_down_reason', "index": ALL}, "value")], prevent_initial_call=True) def click_answer_operate_callback(nClick1, nClick2, confirmCounts, nClicks3, data, values): if (nClick1 is not None and dash.ctx.triggered_id and 'type' in dash.ctx.triggered_id and dash.ctx.triggered_id['type'] == 'refresh_answer' and (dash.ctx.triggered[0].get("value"))): index = dash.ctx.triggered_id['index'] current_problem = data[index - 1] data.append(current_problem) return data, current_problem if (nClick2 is not None and dash.ctx.triggered_id and 'type' in dash.ctx.triggered_id and dash.ctx.triggered_id['type'] == 'thumb_up_answer' and (dash.ctx.triggered[0].get("value"))): index = dash.ctx.triggered_id['index'] if 'operate' in data[index] and data[index]['operate'] == 'thumb_up': data[index]['operate'] = None else: data[index]['operate'] = 'thumb_up' answer_json = dict( chat_id=data[index]['chat_id'], session_id=data[index]['session_id'], session_name=data[index]['session_name'], type=data[index]['type'], is_end=data[index]['is_end'], is_stop=data[index]['is_stop'], copy_text=data[index]['copy_text'], user=data[index]['user'], time=data[index]['time'] if 'time' in data[index] else None, content=json.dumps(data[index]['content']), operate=data[index]['operate'] if 'operate' in data[index] else None, thumb_down_reason=data[index]['thumb_down_reason'] if 'thumb_down_reason' in data[index] else None, file=data[index]['file'] if 'file' in data[index] else None ) update_chat(answer_json) return data, dash.no_update if (confirmCounts is not None and dash.ctx.triggered_id and 'type' in dash.ctx.triggered_id and dash.ctx.triggered_id['type'] == 'thumb_down_answer' and (dash.ctx.triggered[0].get("value"))): index = dash.ctx.triggered_id['index'] value = values[int((index - 1) / 2)] data[index]['thumb_down_reason'] = value data[index]['operate'] = 'thumb_down' answer_json = dict( chat_id=data[index]['chat_id'], session_id=data[index]['session_id'], session_name=data[index]['session_name'], type=data[index]['type'], is_end=data[index]['is_end'], is_stop=data[index]['is_stop'], copy_text=data[index]['copy_text'], user=data[index]['user'], time=data[index]['time'] if 'time' in data[index] else None, content=json.dumps(data[index]['content']), operate=data[index]['operate'] if 'operate' in data[index] else None, thumb_down_reason=data[index]['thumb_down_reason'] if 'thumb_down_reason' in data[index] else None, file=data[index]['file'] if 'file' in data[index] else None ) update_chat(answer_json) return data, dash.no_update if (nClicks3 is not None and dash.ctx.triggered_id and 'type' in dash.ctx.triggered_id and dash.ctx.triggered_id['type'] == 'cancel_thumb_down_answer' and (dash.ctx.triggered[0].get("value"))): index = dash.ctx.triggered_id['index'] data[index]['thumb_down_reason'] = None data[index]['operate'] = None answer_json = dict( chat_id=data[index]['chat_id'], session_id=data[index]['session_id'], session_name=data[index]['session_name'], type=data[index]['type'], is_end=data[index]['is_end'], is_stop=data[index]['is_stop'], copy_text=data[index]['copy_text'], user=data[index]['user'], time=data[index]['time'] if 'time' in data[index] else None, content=json.dumps(data[index]['content']), operate=data[index]['operate'] if 'operate' in data[index] else None, thumb_down_reason=data[index]['thumb_down_reason'] if 'thumb_down_reason' in data[index] else None, file=data[index]['file'] if 'file' in data[index] else None ) update_chat(answer_json) return data, dash.no_update return dash.no_update, dash.no_update @app.callback(Output('chat-history-p8', 'children'), Input('chat-history-list', 'data'), prevent_initial_call=True) def refresh_chat_history_list(history_list): div = html.Ul( [html.Li( [fuc.FefferyDiv( children=html.Div( [ html.Span(item.get("session_name")), ], className="flex-between", style={ "display": 'flex', "justify-content": 'space-between', "align-items": 'center', "font-size": "14px", "width": "100%", "height": "100%" } ), style={"width": "90%", "height": "100%", "padding": "0", "color": "#303133"}, id={"type": "chat-history-li-button", "index": str(item['session_id'])}, ), fac.AntdButton( icon=fac.AntdIcon(icon='md-delete', style={"color": "rgb(100, " "106, " "115)"}), type='text', id={'type': 'delete-chat-button', 'index': item['session_id']}, style={"float": "right", "margin-top": "-26px"} )], style={"padding": "6px 16px", "cursor": "pointer"}, className="chat-history-li", ) for item in history_list], style={"list-style": "none", "margin": "0px", "padding": "0px"} ) return div @app.callback([Output('chat-history-list', 'data'), Output('show_chat_history', 'data')], [Input('chat-history-show-button', 'nClicks'), Input('chat-history-cover-child', 'nClicks')], [State('show_chat_history', 'data'), State("chat_session_id", "data")], prevent_initial_call=True) def click_chat_history(nClicks_1, nClick_2, data, session_id): history_list = [] if nClicks_1 and nClicks_1 > 0: history = get_chat_session_list_api(session_id) if history['code'] == 200: history_list = history['data'] # 后期拼接后台接口 show_chat_history = not data['show_chat_history'] return history_list, {'show_chat_history': show_chat_history} @app.callback([Output('sent-button', 'disabled'), Output('sent-button', 'children'), ], Input('chat-textarea', 'value')) def listen_chat_textarea_has_context(value): if value: return False, html.Img( src="/assets/imgs/icon_send_colorful.svg", style={ "height": '24px' }, ) else: return True, html.Img( src="/assets/imgs/icon_send.svg", style={ "height": '24px' }, ) @app.callback(Output('operate-textarea', 'style'), Input('chat-textarea', 'focusing') ) def listen_chat_textarea_focus(focusing): if focusing: return { "box-shadow": '0px 6px 24px 0px rgba(31, 35, 41, 0.08)', "background-color": '#ffffff', "border-radius": '8px', "border": '1px solid #3370FF', "box-sizing": 'border-box', "display": 'flex', "max-width": '860px', "margin": '0 auto', } else: return { "box-shadow": '0px 6px 24px 0px rgba(31, 35, 41, 0.08)', "background-color": '#ffffff', "border-radius": '8px', "border": '1px solid #ffffff', "box-sizing": 'border-box', "display": 'flex', "max-width": '860px', "margin": '0 auto', } @app.callback( [ Output("choose_machine_popover", "open"), Output("choose_machine_popover", "permanent"), Output("choose_machine_popover", "overlayStyle"), ], [Input("chat-textarea", "value"), Input("ai_chat_width", "data")], State("choose_machine_popover", "overlayStyle"), prevent_initial_call=True, ) def open_agents_panel_with_at(value, width, style): """末尾输入新的@时,打开智能体选择面板并切换控制模式""" style['width'] = width if value and value.endswith("@"): return [True, False, style] else: return [False, False, style] @app.callback( [ Output("chat-textarea", "value", allow_duplicate=True), Output("choose_machine_popover", "open", allow_duplicate=True), Output("selected-machine-container", "style"), Output("selected-machine-container", "children"), Output("current_machine", "data"), ], Input({"type": "machine-item", "index": ALL}, "n_clicks"), [ State("chat-textarea", "value"), State("selected-machine-container", "style") ], prevent_initial_call=True, ) def handle_agent_select(_, value, container_style): """处理智能体选中行为需要引发的相关更新""" return [ # 去除输入框末尾@符号 value[:-1] if value.endswith("@") else dash.no_update, # 关闭智能体选择面板 False, # 更新已选中智能体容器样式 {**container_style, "display": "flex"}, # 更新已选中智能体容器内容 [ fac.AntdText( [ "与", fac.AntdTooltip( fac.AntdButton( dash.ctx.triggered_id["index"], id="change-machine", type="text", nClicks=0, # 重置点击次数,以辅助判断 style={"fontWeight": "bold", "paddingLeft": 5, "paddingRight": 5} ), title="更换智能体", ), "对话", ], style={"color": "#8f959f"}, strong=True, ), fac.AntdButton( id="close-selected-machine-container", icon=fac.AntdIcon(icon="antd-close"), type="text", nClicks=0, # 重置点击次数,以辅助判断 ), ], dash.ctx.triggered_id["index"] ] @app.callback( [ Output("selected-machine-container", "style", allow_duplicate=True), Output("current_machine", "data", allow_duplicate=True) ], Input("close-selected-machine-container", "nClicks"), [ State("selected-machine-container", "style"), ], prevent_initial_call=True, ) def handle_selected_agent_clear(nClicks, container_style): """处理已选中智能体清除行为需要引发的相关更新""" if nClicks: return {**container_style, "display": "none"}, None return dash.no_update, dash.no_update app.clientside_callback( # 根据智能体选择面板展开状态,切换控制模式 "(open) => !open", Output("choose_machine_popover", "permanent", allow_duplicate=True), Input("choose_machine_popover", "open"), prevent_initial_call=True, ) app.clientside_callback( # 点击已选中智能体名称,打开智能体选择面板 "(nClicks) => nClicks ? true : false", Output("choose_machine_popover", "open", allow_duplicate=True), Input("change-machine", "nClicks"), prevent_initial_call=True, ) app.clientside_callback( # 通过关闭按钮点击智能体选择面板 "(nClicks) => false", Output("choose_machine_popover", "open", allow_duplicate=True), Input("close-agents-panel", "nClicks"), prevent_initial_call=True, ) @app.callback([Output('chat-history-collapse', 'isOpen'), Output('chat-history-cover', 'style')], Input('show_chat_history', 'data'), prevent_initial_call=True) def change_chat_history_popover_by_show(data): show = data['show_chat_history'] cover_style = { "z-index": '95', "width": '100%', "height": '100%', "position": 'absolute', "left": '0px', "top": '0px', "background": '#747374', "filter": 'alpha(opacity=60)', "opacity": 0.6, "display": 'none' } if show: cover_style['display'] = 'block' else: cover_style['display'] = 'none' return show, cover_style @app.callback([Output("chat_data_list", "data", allow_duplicate=True), Output('show_chat_history', 'data', allow_duplicate=True), Output("current_problem", "data", allow_duplicate=True), Output("chat_session_id", "data", allow_duplicate=True)], Input({'type': 'chat-history-li-button', 'index': ALL}, 'nClicks'), [State('chat_data_list', "data"), State('show_chat_history', 'data'), State("current_problem", "data") ], prevent_initial_call=True) def choose_history_chat(nClicks, chat_data_list, show, current_problem): if (nClicks is not None and dash.ctx.triggered_id['type'] == 'chat-history-li-button' and ( # 判断本次ALL批量触发是否有效 # value为回调角色的属性值,这里代表点击次数nclicks # #正常由用户点击触发的行为,dash.ctx.triggered中第0个元素点击次数应有效 dash.ctx.triggered[0].get("value") )): history_id = dash.ctx.triggered_id['index'] chat_list = get_ai_chat_list_api(history_id) if chat_list['code'] == 200: arr = chat_list['data'] chat_data = [] for item in arr: if item.get('type') == 'answer': item['content'] = json.loads(item['content']) chat_data.append(item) chat_data_list = chat_data return chat_data_list, {'show_chat_history': not show['show_chat_history']}, "", chat_data_list[0]['session_id'] else: return chat_data_list, show, current_problem, dash.no_update @app.callback( Output("chat-history-list", "data", allow_duplicate=True), Input({'type': 'delete-chat-button', 'index': ALL}, 'nClicks'), [State('chat-history-list', 'data'), State("chat_session_id", "data")], prevent_initial_call=True ) def listen_delete_chat_button_click(nClicks, data, current_session_id): if nClicks is not None and dash.ctx.triggered_id['type'] == 'delete-chat-button' and \ ( # 判断本次ALL批量触发是否有效 # value为回调角色的属性值,这里代表点击次数nclicks # #正常由用户点击触发的行为,dash.ctx.triggered中第0个元素点击次数应有效 dash.ctx.triggered[0].get("value") ): delete_chat_session(dash.ctx.triggered_id['index']) chat_history_list = get_chat_session_list_api(current_session_id) history_list = [] if chat_history_list['code'] == 200: history_list = chat_history_list['data'] # 后期拼接后台接口 return history_list return data app.clientside_callback( # 处理聊天区域的自动滚动策略 ClientsideFunction(namespace="aichat_clientside", function_name="handleChatListAreaScroll"), Input("listen-chat-data-list-height", "height") ) # 显示用户导入弹窗及重置上传弹窗组件状态回调 @app.callback( [Output('ai-chat-file-modal', 'visible'), Output('ai-chat-file-modal', 'children')], Input('ai_chat_upload', 'nClicks'), [State('current_files', 'data'), State("chat_session_id", "data"), ], prevent_initial_call=True ) def show_ai_chat_upload(nClicks, current_files, chat_session_id): if nClicks and nClicks > 0: upload_files = [] if len(current_files) > 0: for index, item in enumerate(current_files): upload_files.append({ "name": item['file'], "status": "done", "uid": item['file'], "taskId": None }) return True, html.Div( fac.AntdDraggerUpload( id='aichat-upload-choose', apiUrl=f'{ApiBaseUrlConfig.BaseUrl}/aichat/upload', apiUrlExtraParams={'session_id': chat_session_id}, headers={'Authorization': 'Bearer ' + session.get('Authorization')}, # fileTypes=['xls', 'xlsx'], # fileListMaxLength=1, defaultFileList=upload_files, text='文件导入', hint='点击或拖拽文件至此处进行上传' ), style={ 'marginTop': '10px' } ) return False, dash.no_update @app.callback( Output("current_files", "data"), Input("aichat-upload-choose", "listUploadTaskRecord"), State("current_files", "data"), prevent_initial_call=True ) def init_current_files(records, files): array = [] if records is not None: for item in records: print(item) if 'uploadResponse' in item: uploadResponse = item['uploadResponse'] array.append({'file': uploadResponse['data']['file'], 'bucket': uploadResponse['data']['bucket']}) else: for index, file in enumerate(files): if item['fileName'] == file['file']: array.append(file) return array return dash.no_update @app.callback( Output('file_tags', 'children'), Input("current_files", "data"), prevent_initial_call=True ) def update_file_tags(files): array = [] if files is not None and len(files) > 0: for index, item in enumerate(files): array.append( fac.AntdTag( content=item['file'][:30] + "..." if len(item['file']) > 30 else item['file'], closeIcon=True, id={ 'type': 'tag-close-file', 'index': index, }, style={ 'color': 'rgb(63 215 121)', 'background-color': 'rgb(230, 255, 239)', 'border-color': 'rgb(102, 255, 161)', 'margin-bottom': '5px' } ) ) return array @app.callback( Output("current_files", "data", allow_duplicate=True), Input({'type': 'tag-close-file', 'index': ALL}, 'closeCounts'), State("current_files", "data"), prevent_initial_call=True ) def remove_current_file(closeCounts, files): if closeCounts is not None and dash.ctx.triggered_id is not None and\ dash.ctx.triggered_id['type'] == 'tag-close-file' and \ ( # 判断本次ALL批量触发是否有效 # value为回调角色的属性值,这里代表点击次数nclicks # #正常由用户点击触发的行为,dash.ctx.triggered中第0个元素点击次数应有效 dash.ctx.triggered[0].get("value") ): index = dash.ctx.triggered_id['index'] del files[index] return files return dash.no_update @app.callback( Output('down_load_chat_file', 'data'), Input({'type': 'file', 'index': ALL}, "nClicks"), [State("chat_data_list", "data"), State("chat_session_id", "data")], prevent_initial_call=True ) def down_load_file(nClicks, data, chat_session_id): if nClicks is not None and dash.ctx.triggered_id['type'] == 'file'\ and ( dash.ctx.triggered[0].get("value") ): index = dash.ctx.triggered_id['index'] chat_idx = index.split("_")[0] idx = index.split("_")[1] file = json.loads(data[int(chat_idx)]['file'])[int(idx)] file_name = file['file'] bucket = file['bucket'] export_file_res = export_chat_file_api({'bucket': bucket, 'file': file_name, 'session_id': chat_session_id}) if export_file_res.status_code == 200: export_user = export_file_res.content return dcc.send_bytes(export_user, file_name) return dash.no_update