You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Maximo 7.6.1.1与外部JSON API集成的自动化脚本开发及架构选型咨询

Maximo 7.6.1.1 集成外部REST API解决方案详解

首先解决你当前脚本的核心错误:mbo is not defined。因为你用的是发布通道的External Exit(用户出口类),Jython脚本的入口参数不是直接的mbo,而是ExternalExitContext对象。你需要从上下文里获取当前MBO实例,同时不要再自己手动构造HttpClient调用——Maximo的发布通道框架已经帮你处理了HTTP请求的发送,你只需要在Exit里修改Payload、调整端点参数,以及处理响应即可。

基础脚本修正(解决mbo未定义+利用Maximo集成框架)

from com.ibm.json.java import JSONObject
from org.apache.commons.codec.binary import Base64
from psdi.iface.router import ExternalExitContext
from psdi.mbo import MboRemote
from psdi.server import MXServer

def execute(context):
    # 从上下文获取当前MBO
    mbo = context.getMbo()
    if not mbo:
        return
    
    # 获取端点配置(替代手动调用Router.getHandler)
    endpoint = context.getEndpoint()
    endpoint_url = endpoint.getProperty("URL")
    http_method = endpoint.getProperty("HTTPMETHOD")
    username = endpoint.getProperty("USERNAME")
    password = endpoint.getProperty("PASSWORD")

    # 构造Basic Auth Header(并更新端点的HEADERS)
    auth_str = f"{username}:{password}"
    auth_base64 = Base64.encodeBase64String(auth_str.encode('ISO-8859-1'))
    existing_headers = endpoint.getProperty("HEADERS")
    updated_headers = f"{existing_headers}\nAuthorization: Basic {auth_base64}"
    endpoint.setProperty("HEADERS", updated_headers)

    # 后续按场景处理逻辑...

分场景需求实现(基于External Exit脚本)

你的所有场景都可以在同一个External Exit脚本里通过分支判断实现,以下是各场景的核心逻辑:

1. 外部创建的SR被取消(SOURCE=LEAKREP,EXTERNALRECID非空)

def handle_canceled_external_sr(mbo):
    # 验证条件
    if mbo.getString("SOURCE") != "LEAKREP" or mbo.isNull("EXTERNALRECID"):
        return None
    
    # 获取最新的logtype=CANCOMM的工作日志
    worklog_set = mbo.getMboSet("WORKLOG")
    worklog_set.setWhere("LOGTYPE='CANCOMM'")
    worklog_set.setSortOrder("REPORTDATE DESC")
    latest_worklog = worklog_set.moveFirst()
    if not latest_worklog:
        return None
    
    # 构造JSON Payload
    payload = JSONObject()
    payload.put("logtype", "CANCOMM")
    payload.put("summary", latest_worklog.getString("DESCRIPTION"))
    payload.put("description_longdescription", latest_worklog.getString("LONGDESCRIPTION"))
    payload.put("userid", mbo.getString("LASTCHANGEDBY"))
    return payload

2. 外部创建的SR被关闭

def handle_closed_external_sr(mbo):
    if mbo.getString("SOURCE") != "LEAKREP" or mbo.isNull("EXTERNALRECID"):
        return None
    
    # 获取最新的logtype≠CANCOMM的工作日志
    worklog_set = mbo.getMboSet("WORKLOG")
    worklog_set.setWhere("LOGTYPE<>'CANCOMM'")
    worklog_set.setSortOrder("REPORTDATE DESC")
    latest_worklog = worklog_set.moveFirst()
    if not latest_worklog:
        return None
    
    payload = JSONObject()
    payload.put("summary", latest_worklog.getString("DESCRIPTION"))
    payload.put("description_longdescription", latest_worklog.getString("LONGDESCRIPTION"))
    return payload

3. 重复工单处理

def handle_duplicate_sr(mbo):
    # 假设你有一个自定义字段标识重复工单,比如DUPLICATEFLAG=Y
    if mbo.getString("DUPLICATEFLAG") != "Y":
        return None
    
    payload = JSONObject()
    # 添加基础字段(按需补充)
    # ...
    # 获取相似TEMPLATEID的未结SR(表域对应的逻辑)
    sr_set = MXServer.getMXServer().getMboSet("SR", mbo.getUserInfo())
    sr_set.setWhere(f"TEMPLATEID='{mbo.getString('TEMPLATEID')}' AND STATUS NOT IN ('CLOSED','CANCELLED') AND SRNUM!='{mbo.getString('SRNUM')}'")
    duplicate_srs = []
    sr = sr_set.moveFirst()
    while sr:
        duplicate_srs.append(sr.getString("SRNUM"))
        sr = sr_set.moveNext()
    payload.put("DUPLICATE", duplicate_srs)
    return payload

4. Maximo原生LEAK类型SR(无SOURCE/EXTERNALRECID)

def handle_native_leak_sr(mbo, context):
    if mbo.getString("SOURCE") is not None or not mbo.isNull("EXTERNALRECID") or mbo.getString("TEMPLATEID") not in ["LEAK","LEAKH","LEAKW"]:
        return None
    
    # 构造包含SR、TKSERVICEADDRESS、WORKLOG的Payload
    payload = JSONObject()
    # SR字段
    payload.put("description", mbo.getString("DESCRIPTION"))
    payload.put("reportdate", mbo.getString("REPORTDATE"))
    payload.put("reportedby", mbo.getString("REPORTEDBY"))
    # TKSERVICEADDRESS字段
    service_addr = mbo.getMboSet("TKSERVICEADDRESS").moveFirst()
    if service_addr:
        payload.put("formattedaddress", service_addr.getString("FORMATTEDADDRESS"))
    # WORKLOG字段(所有存在的工作日志)
    worklogs = []
    worklog_set = mbo.getMboSet("WORKLOG")
    worklog = worklog_set.moveFirst()
    while worklog:
        wl_obj = JSONObject()
        wl_obj.put("description", worklog.getString("DESCRIPTION"))
        wl_obj.put("longdescription", worklog.getString("LONGDESCRIPTION"))
        worklogs.append(wl_obj)
        worklog = worklog_set.moveNext()
    payload.put("worklogs", worklogs)

    # 设置HTTPMETHOD为POST(覆盖端点配置)
    context.getEndpoint().setProperty("HTTPMETHOD", "POST")

    # 处理响应更新SOURCE和EXTERNALRECID(需启用发布通道同步模式)
    response = context.getResponse()
    if response and response.getStatusCode() == 200:
        response_json = JSONObject.parse(response.getInputStream())
        external_ticket_id = response_json.get("ticketid")
        if external_ticket_id:
            mbo.setValue("SOURCE", "LEAKREP")
            mbo.setValue("EXTERNALRECID", external_ticket_id)
            mbo.save()
    return payload

动态更新端点参数

因为你的端点已经勾选了Allow Override,所以可以直接在脚本里修改参数:

  • 修改HTTP方法:context.getEndpoint().setProperty("HTTPMETHOD", "PATCH")(比如在更新外部工单时用PATCH)
  • 添加/修改Headers:如前面的Basic Auth示例,直接拼接现有Headers并更新端点的HEADERS属性

架构选择:单一组件 vs 多组件

推荐使用单一对象结构、发布通道、端点+单个External Exit脚本,原因如下:

  1. 集中维护:所有业务逻辑都在一个脚本里,后续维护人员不需要在多个发布通道/端点之间切换查找逻辑,只需要看一个地方的分支判断。
  2. 减少冗余:避免重复创建相同的对象结构、端点配置,减少系统中的冗余组件。
  3. 逻辑清晰:通过模块化函数+条件分支,每个场景的处理逻辑独立,添加注释后可读性极强。

当然,如果未来某个场景的逻辑变得异常复杂(比如需要完全不同的路由规则、数据转换),再考虑拆分单独的发布通道,但目前你的所有场景都基于同一个对象结构,单一组件方案更优。

内容的提问来源于stack exchange,提问作者Rick

火山引擎 最新活动