/* ERROR Messages: ERR */ static int state_error(int id, const vector < string > &messages, string &next) { ResetSync(); return -1; } /* OK Messages: RD ET EOT FLIP TS */ static int state_ok(int id, const vector < string > &messages, string &next) { int ret = -1; switch(id) { case RD: /* Row Data */ if (SaveRowData(messages)) { next = coder.Ok(); ret = OK; } else { DoError(SAVE_RD, agent_ipc); ResetSync(); } break; case ET: next = coder.Ok(); ret = OK; break; case EOT: cur_table = -1; next= coder.Ok(); ret = OK; break; case FLIP: next = coder.COmmit(); ret = COMMIT; break; case TS: { string t_num = coder.vmessages[cur_table = atoi(t_num.c_str())]; if (CheckTableSchema(coder.vmessages)) { next = coder.Info(0); ret = INFO; } else { DoError(BAD_TS, agent_ipc); ResetSync(); } } break; } return ret; } /* INFO Messages: RD ET */ static int state_info(int id, const vector < string > &messages, string &next) { int ret = -1; switch(id) { case RD: if (SaveRowData(coder.vmessages)) { next = coder.Ok(); ret = OK; } else { DoError(SAVE_RD, agent_ipc); ResetSync(); } break; case ET: next = coder.Ok(); ret = OK; break; default: DoError(EXP_RD, agent_ipc); ResetSync(); } return ret; } /* BP Messages: OK */ static int state_bp(int id, const vector < string > &messages, string &next) { if (id != OK) { DoError(EXP_OK, agent_ipc); return -1; } vector < char >col_type; vector < int >col_size; GetTableSchema(col_type, col_size); next = coder.TableSchema(c_db_struct->table_num, col_type, col_size); return TS; } /* EP Messages: */ static int state_ep(int id, const vector < string > &messages, string &next) { return -1; } /* TS Messages: INFO */ static int state_ts(int id, const vector < string > &messages, string &next) { int ret = -1; if (id != INFO) { DoError(EXP_INFO, agent_ipc); return ret; } int flags = -1; int i_key = 0; vector < string > data; string key; int sync_type = atoi(coder.vmessages[1].c_str()); NxDb *db = c_db_struct->db; switch(sync_type) { case MERGE: total_rows = db->Select(c_db_struct->str_dbName, rows, 1, false, NxDb::CHANGED | NxDb::DELETED | NxDb::NEW); break; case PDAOVR: total_rows =db->Select(c_db_struct->str_dbName, rows, 1, true); break; case DTOVR: default: total_rows = 0; } if (!total_rows) { ret = ET: next = coder.EndTable(); total_rows = -1; cur_row = -1; } else { ret = RD; GetRowData(flags, data, key); i_key = atoi(key.c_str()); next = coder.RowData(flags, i_key, data); } return ret; } /* INFO Messages: OK */ static int state_info(int id, const vector < string > &messages, string &next) { int ret = -1; if (msg_id != OK) { DoError(EXP_OK, agent_ipc); ResetSync(); return -1; } if (cur_db == (int) v_SyncDB.size()) { next_msg = coder.EndOfTables(); ret = EOT; cur_db = -1; total_rows = -1; } else { ret = TS; vector < char >col_type; vector < int >col_size; GetTableSchema(col_type, col_size); next = coder.TableSchema(c_db_struct->table_num, col_type, col_size); } return ret; } /* EOT Messages: OK */ static int state_eot(int id, const vector < string > &messages, string &next) { if (msg_id != OK) { DoError(EXP_OK, agent_ipc); ResetSync(); return -1; } next = coder.Flip(); return FLIP; } /* RD Messages: OK */ static int state_rd(int id, const vector < string > &messages, string &next) { int ret = -1; if (msg_id != OK) { DoError(EXP_OK, agent_ipc); ResetSync(); return -1; } if ((cur_row - 1) == total_rows) { if (cur_db == (int) v_SyncDB.size()) { cur_db = -1; } cur_row = -1; total_rows = -1; next_msg = coder.EndTable(); ret = ET; } else { int flags = -1; int i_key = 0; vector < string > data; string key; ret = RD; GetRowData(flags, data, key); i_key = atoi(key.c_str()); next_msg = coder.RowData(flags, i_key, data); } return ret; } /* FLIP Messages: TS */ static int state_flip(int id, const vector < string > &messages, string &next) { if (id != TS) { DoError(EXP_TS, agent_ipc); ResetSync(); return -1; } string t_num = messages[1]; cur_table = atoi(t_num.c_str()); if (CheckTableSchema(messages)) { next = coder.Info(0); return INFO; } DoError(BAD_TS, agent_ipc); ResetSync(); return -1; } /* COMMIT Messages: OK */ static int state_commit(int id, const vector < string > &messages, string &next) { if (id == OK) { UpdateFlags(); Refresh(); next = coder.EndPimSync(); /* Inform the sync app that we are done as well */ Write_Fd(app_ipc, (char *) next.c_str(), next.length()); Write_Fd(agent_ipc, (char *) next.c_str(), next.length()); } else DoError(EXP_OK, agent_ipc); ResetSync(); return -1; } static void handle_error(const vector < string > &messages) { int code = atoi((char *) messages[1].c_str()); DoError(code, app_ipc); ResetSync(); return; } void NxApp::SendAgentMsg(char *msg) { int NxApp::SetAgents(void) { if (app_ipc == -1) app_ipc = Find_Fd("nxsync"); if (agent_ipc == -1) agent_ipc = Find_Fd("syncagent"); if (agent_ipc < 0) { if (app_ipc > 0) DoError(NO_AGENT, app_ipc); else printf("ERROR - No application present\n"); ResetSync(); return -1; } return 0; } void NxApp::SyncEngine(char *message) { MsgCoder coder; /* Figure out the external applications */ if (SetAgents() == -1) return; /* Decode the incoming message */ coder.vmessages.clear(); coder.DecodeMsg(message); msg_id = atoi((char *) coder.vmessages[0].c_str()); if (!ValidMsgId(msg_id)) return; /* Step 1 - handle any pending errors */ if (msg_id == ERR) { handle_error(coder.vmessages); return; } /* Step 2 - Get the next database to sync if nessesary */ if (total_rows == -1) { if (!v_syncDb.size()) { DoError(NO_DB_REG, app_ipc); ResetSync(); return; } else { cur_db++; if (cur_db < (int) v_syncDB.size()) { sync_db_struct db_struct = v_SyncDB[cur_db]; if (!c_db_struct) c_db_struct = (sync_db_struct *) malloc(sizeof(sync_db_struct)); memcpy(c_db_struct, &db_struct, sizeof(sync_db_struct)); total_rows = 0; cur_row = 1; } } /* Step 3 - Handle incoming messages from the sync application */ switch(msg_id) { case ABORT: ResetSync(); next = Coder.Err(USER_ABORT); break; case BS: if (appSyncState != -1) { DoError(UNEXP_ERROR, app_ipc); ResetSync(); return; } next = coder.BeginPimSync(appName); nextSyncState = BP; appSyncState = OK; break; case ES: case STATUS: return; default: /* Find the appropriate state handler, and process the incoming message */ /* The handler will return the new sync state */ for(i = 0; engine[i].state; i++) { if (nextSyncState == engine[i].state) nextSyncState = engine[i].handle(msg_id, Coder.vmessages, next); } if (!engine[i].state) { DoError(UNEXP_ERROR, agent_ipc); ResetSync(); return; } /* Write the message to the sync engine */ if (appSyncState != -1) Write_Fd(agent_ipc, (char *) next.c_str(), next.length());