@@ -18,6 +18,16 @@ class DbalProducer implements PsrProducer
18
18
*/
19
19
private $ priority ;
20
20
21
+ /**
22
+ * @var int|float|null
23
+ */
24
+ private $ deliveryDelay ;
25
+
26
+ /**
27
+ * @var int|float|null
28
+ */
29
+ private $ timeToLive ;
30
+
21
31
/**
22
32
* @var DbalContext
23
33
*/
@@ -47,6 +57,12 @@ public function send(PsrDestination $destination, PsrMessage $message)
47
57
if (null !== $ this ->priority && null === $ message ->getPriority ()) {
48
58
$ message ->setPriority ($ this ->priority );
49
59
}
60
+ if (null !== $ this ->deliveryDelay && null === $ message ->getDeliveryDelay ()) {
61
+ $ message ->setDeliveryDelay ($ this ->deliveryDelay );
62
+ }
63
+ if (null !== $ this ->timeToLive && null === $ message ->getTimeToLive ()) {
64
+ $ message ->setTimeToLive ($ this ->timeToLive );
65
+ }
50
66
51
67
$ body = $ message ->getBody ();
52
68
if (is_scalar ($ body ) || null === $ body ) {
@@ -58,15 +74,24 @@ public function send(PsrDestination $destination, PsrMessage $message)
58
74
));
59
75
}
60
76
77
+ $ sql = 'SELECT ' .$ this ->context ->getDbalConnection ()->getDatabasePlatform ()->getGuidExpression ();
78
+ $ uuid = $ this ->context ->getDbalConnection ()->query ($ sql )->fetchColumn (0 );
79
+
80
+ if (empty ($ uuid )) {
81
+ throw new \LogicException ('The generated uuid is empty ' );
82
+ }
83
+
61
84
$ dbalMessage = [
85
+ 'id ' => $ uuid ,
86
+ 'published_at ' => (int ) microtime (true ) * 10000 ,
62
87
'body ' => $ body ,
63
88
'headers ' => JSON ::encode ($ message ->getHeaders ()),
64
89
'properties ' => JSON ::encode ($ message ->getProperties ()),
65
90
'priority ' => $ message ->getPriority (),
66
91
'queue ' => $ destination ->getQueueName (),
67
92
];
68
93
69
- $ delay = $ message ->getDelay ();
94
+ $ delay = $ message ->getDeliveryDelay ();
70
95
if ($ delay ) {
71
96
if (!is_int ($ delay )) {
72
97
throw new \LogicException (sprintf (
@@ -79,16 +104,35 @@ public function send(PsrDestination $destination, PsrMessage $message)
79
104
throw new \LogicException (sprintf ('Delay must be positive integer but got: "%s" ' , $ delay ));
80
105
}
81
106
82
- $ dbalMessage ['delayed_until ' ] = time () + $ delay ;
107
+ $ dbalMessage ['delayed_until ' ] = time () + (int ) $ delay / 1000 ;
108
+ }
109
+
110
+ $ timeToLive = $ message ->getTimeToLive ();
111
+ if ($ timeToLive ) {
112
+ if (!is_int ($ timeToLive )) {
113
+ throw new \LogicException (sprintf (
114
+ 'TimeToLive must be integer but got: "%s" ' ,
115
+ is_object ($ timeToLive ) ? get_class ($ timeToLive ) : gettype ($ timeToLive )
116
+ ));
117
+ }
118
+
119
+ if ($ timeToLive <= 0 ) {
120
+ throw new \LogicException (sprintf ('TimeToLive must be positive integer but got: "%s" ' , $ timeToLive ));
121
+ }
122
+
123
+ $ dbalMessage ['time_to_live ' ] = time () + (int ) $ timeToLive / 1000 ;
83
124
}
84
125
85
126
try {
86
127
$ this ->context ->getDbalConnection ()->insert ($ this ->context ->getTableName (), $ dbalMessage , [
128
+ 'id ' => Type::GUID ,
129
+ 'published_at ' => Type::INTEGER ,
87
130
'body ' => Type::TEXT ,
88
131
'headers ' => Type::TEXT ,
89
132
'properties ' => Type::TEXT ,
90
133
'priority ' => Type::SMALLINT ,
91
134
'queue ' => Type::STRING ,
135
+ 'time_to_live ' => Type::INTEGER ,
92
136
'delayed_until ' => Type::INTEGER ,
93
137
]);
94
138
} catch (\Exception $ e ) {
@@ -101,19 +145,17 @@ public function send(PsrDestination $destination, PsrMessage $message)
101
145
*/
102
146
public function setDeliveryDelay ($ deliveryDelay )
103
147
{
104
- if (null === $ deliveryDelay ) {
105
- return ;
106
- }
148
+ $ this ->deliveryDelay = $ deliveryDelay ;
107
149
108
- throw new \ LogicException ( ' Not implemented ' ) ;
150
+ return $ this ;
109
151
}
110
152
111
153
/**
112
154
* {@inheritdoc}
113
155
*/
114
156
public function getDeliveryDelay ()
115
157
{
116
- return null ;
158
+ return $ this -> deliveryDelay ;
117
159
}
118
160
119
161
/**
@@ -139,18 +181,14 @@ public function getPriority()
139
181
*/
140
182
public function setTimeToLive ($ timeToLive )
141
183
{
142
- if (null === $ timeToLive ) {
143
- return ;
144
- }
145
-
146
- throw new \LogicException ('Not implemented ' );
184
+ $ this ->timeToLive = $ timeToLive ;
147
185
}
148
186
149
187
/**
150
188
* {@inheritdoc}
151
189
*/
152
190
public function getTimeToLive ()
153
191
{
154
- return null ;
192
+ return $ this -> timeToLive ;
155
193
}
156
194
}
0 commit comments