Maximo 7.6.1.1与外部JSON API集成的自动化脚本开发及架构选型咨询
Maximo 7.6.1.1 集成外部REST API解决方案详解
首先解决你当前脚本的核心错误:mbo is not defined。因为你用的是发布通道的External Exit(用户出口类),Jython脚本的入口参数不是直接的mbo,而是ExternalExitContext对象。你需要从上下文里获取当前MBO实例,同时不要再自己手动构造HttpClient调用——Maximo的发布通道框架已经帮你处理了HTTP请求的发送,你只需要在Exit里修改Payload、调整端点参数,以及处理响应即可。
基础脚本修正(解决mbo未定义+利用Maximo集成框架)
from com.ibm.json.java import JSONObject from org.apache.commons.codec.binary import Base64 from psdi.iface.router import ExternalExitContext from psdi.mbo import MboRemote from psdi.server import MXServer def execute(context): # 从上下文获取当前MBO mbo = context.getMbo() if not mbo: return # 获取端点配置(替代手动调用Router.getHandler) endpoint = context.getEndpoint() endpoint_url = endpoint.getProperty("URL") http_method = endpoint.getProperty("HTTPMETHOD") username = endpoint.getProperty("USERNAME") password = endpoint.getProperty("PASSWORD") # 构造Basic Auth Header(并更新端点的HEADERS) auth_str = f"{username}:{password}" auth_base64 = Base64.encodeBase64String(auth_str.encode('ISO-8859-1')) existing_headers = endpoint.getProperty("HEADERS") updated_headers = f"{existing_headers}\nAuthorization: Basic {auth_base64}" endpoint.setProperty("HEADERS", updated_headers) # 后续按场景处理逻辑...
分场景需求实现(基于External Exit脚本)
你的所有场景都可以在同一个External Exit脚本里通过分支判断实现,以下是各场景的核心逻辑:
1. 外部创建的SR被取消(SOURCE=LEAKREP,EXTERNALRECID非空)
def handle_canceled_external_sr(mbo): # 验证条件 if mbo.getString("SOURCE") != "LEAKREP" or mbo.isNull("EXTERNALRECID"): return None # 获取最新的logtype=CANCOMM的工作日志 worklog_set = mbo.getMboSet("WORKLOG") worklog_set.setWhere("LOGTYPE='CANCOMM'") worklog_set.setSortOrder("REPORTDATE DESC") latest_worklog = worklog_set.moveFirst() if not latest_worklog: return None # 构造JSON Payload payload = JSONObject() payload.put("logtype", "CANCOMM") payload.put("summary", latest_worklog.getString("DESCRIPTION")) payload.put("description_longdescription", latest_worklog.getString("LONGDESCRIPTION")) payload.put("userid", mbo.getString("LASTCHANGEDBY")) return payload
2. 外部创建的SR被关闭
def handle_closed_external_sr(mbo): if mbo.getString("SOURCE") != "LEAKREP" or mbo.isNull("EXTERNALRECID"): return None # 获取最新的logtype≠CANCOMM的工作日志 worklog_set = mbo.getMboSet("WORKLOG") worklog_set.setWhere("LOGTYPE<>'CANCOMM'") worklog_set.setSortOrder("REPORTDATE DESC") latest_worklog = worklog_set.moveFirst() if not latest_worklog: return None payload = JSONObject() payload.put("summary", latest_worklog.getString("DESCRIPTION")) payload.put("description_longdescription", latest_worklog.getString("LONGDESCRIPTION")) return payload
3. 重复工单处理
def handle_duplicate_sr(mbo): # 假设你有一个自定义字段标识重复工单,比如DUPLICATEFLAG=Y if mbo.getString("DUPLICATEFLAG") != "Y": return None payload = JSONObject() # 添加基础字段(按需补充) # ... # 获取相似TEMPLATEID的未结SR(表域对应的逻辑) sr_set = MXServer.getMXServer().getMboSet("SR", mbo.getUserInfo()) sr_set.setWhere(f"TEMPLATEID='{mbo.getString('TEMPLATEID')}' AND STATUS NOT IN ('CLOSED','CANCELLED') AND SRNUM!='{mbo.getString('SRNUM')}'") duplicate_srs = [] sr = sr_set.moveFirst() while sr: duplicate_srs.append(sr.getString("SRNUM")) sr = sr_set.moveNext() payload.put("DUPLICATE", duplicate_srs) return payload
4. Maximo原生LEAK类型SR(无SOURCE/EXTERNALRECID)
def handle_native_leak_sr(mbo, context): if mbo.getString("SOURCE") is not None or not mbo.isNull("EXTERNALRECID") or mbo.getString("TEMPLATEID") not in ["LEAK","LEAKH","LEAKW"]: return None # 构造包含SR、TKSERVICEADDRESS、WORKLOG的Payload payload = JSONObject() # SR字段 payload.put("description", mbo.getString("DESCRIPTION")) payload.put("reportdate", mbo.getString("REPORTDATE")) payload.put("reportedby", mbo.getString("REPORTEDBY")) # TKSERVICEADDRESS字段 service_addr = mbo.getMboSet("TKSERVICEADDRESS").moveFirst() if service_addr: payload.put("formattedaddress", service_addr.getString("FORMATTEDADDRESS")) # WORKLOG字段(所有存在的工作日志) worklogs = [] worklog_set = mbo.getMboSet("WORKLOG") worklog = worklog_set.moveFirst() while worklog: wl_obj = JSONObject() wl_obj.put("description", worklog.getString("DESCRIPTION")) wl_obj.put("longdescription", worklog.getString("LONGDESCRIPTION")) worklogs.append(wl_obj) worklog = worklog_set.moveNext() payload.put("worklogs", worklogs) # 设置HTTPMETHOD为POST(覆盖端点配置) context.getEndpoint().setProperty("HTTPMETHOD", "POST") # 处理响应更新SOURCE和EXTERNALRECID(需启用发布通道同步模式) response = context.getResponse() if response and response.getStatusCode() == 200: response_json = JSONObject.parse(response.getInputStream()) external_ticket_id = response_json.get("ticketid") if external_ticket_id: mbo.setValue("SOURCE", "LEAKREP") mbo.setValue("EXTERNALRECID", external_ticket_id) mbo.save() return payload
动态更新端点参数
因为你的端点已经勾选了Allow Override,所以可以直接在脚本里修改参数:
- 修改HTTP方法:
context.getEndpoint().setProperty("HTTPMETHOD", "PATCH")(比如在更新外部工单时用PATCH) - 添加/修改Headers:如前面的Basic Auth示例,直接拼接现有Headers并更新端点的HEADERS属性
架构选择:单一组件 vs 多组件
推荐使用单一对象结构、发布通道、端点+单个External Exit脚本,原因如下:
- 集中维护:所有业务逻辑都在一个脚本里,后续维护人员不需要在多个发布通道/端点之间切换查找逻辑,只需要看一个地方的分支判断。
- 减少冗余:避免重复创建相同的对象结构、端点配置,减少系统中的冗余组件。
- 逻辑清晰:通过模块化函数+条件分支,每个场景的处理逻辑独立,添加注释后可读性极强。
当然,如果未来某个场景的逻辑变得异常复杂(比如需要完全不同的路由规则、数据转换),再考虑拆分单独的发布通道,但目前你的所有场景都基于同一个对象结构,单一组件方案更优。
内容的提问来源于stack exchange,提问作者Rick




